Browse Source

Merge pull request #63141 from reduz/implement-thread-runner

Rémi Verschelde 3 năm trước cách đây
mục cha
commit
d2fa9cbdfd

+ 463 - 0
core/object/worker_thread_pool.cpp

@@ -0,0 +1,463 @@
+/*************************************************************************/
+/*  worker_thread_pool.cpp                                               */
+/*************************************************************************/
+/*                       This file is part of:                           */
+/*                           GODOT ENGINE                                */
+/*                      https://godotengine.org                          */
+/*************************************************************************/
+/* Copyright (c) 2007-2022 Juan Linietsky, Ariel Manzur.                 */
+/* Copyright (c) 2014-2022 Godot Engine contributors (cf. AUTHORS.md).   */
+/*                                                                       */
+/* Permission is hereby granted, free of charge, to any person obtaining */
+/* a copy of this software and associated documentation files (the       */
+/* "Software"), to deal in the Software without restriction, including   */
+/* without limitation the rights to use, copy, modify, merge, publish,   */
+/* distribute, sublicense, and/or sell copies of the Software, and to    */
+/* permit persons to whom the Software is furnished to do so, subject to */
+/* the following conditions:                                             */
+/*                                                                       */
+/* The above copyright notice and this permission notice shall be        */
+/* included in all copies or substantial portions of the Software.       */
+/*                                                                       */
+/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,       */
+/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF    */
+/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/
+/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY  */
+/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,  */
+/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE     */
+/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.                */
+/*************************************************************************/
+
+#include "worker_thread_pool.h"
+
+#include "core/os/os.h"
+
+WorkerThreadPool *WorkerThreadPool::singleton = nullptr;
+
+void WorkerThreadPool::_process_task_queue() {
+	task_mutex.lock();
+	Task *task = task_queue.first()->self();
+	task_queue.remove(task_queue.first());
+	task_mutex.unlock();
+	_process_task(task);
+}
+
+void WorkerThreadPool::_process_task(Task *p_task) {
+	bool low_priority = p_task->low_priority;
+
+	if (p_task->group) {
+		// Handling a group
+		bool do_post = false;
+		if (p_task->native_group_func) {
+			while (true) {
+				uint32_t work_index = p_task->group->index.postincrement();
+				if (work_index >= p_task->group->max) {
+					do_post = work_index == p_task->group->max; // First one reaching max handles semaphore and clean-up.
+					break;
+				}
+				p_task->native_group_func(p_task->native_func_userdata, work_index);
+			}
+
+		} else {
+			Callable::CallError ce;
+			Variant ret;
+			Variant arg;
+			Variant *argptr = &arg;
+			while (true) {
+				uint32_t work_index = p_task->group->index.postincrement();
+				if (work_index >= p_task->group->max) {
+					do_post = work_index == p_task->group->max; // First one reaching max handles semaphore and clean-up.
+					break;
+				}
+				arg = work_index;
+				p_task->callable.call((const Variant **)&argptr, 1, ret, ce);
+			}
+		}
+
+		if (low_priority && use_native_low_priority_threads) {
+			p_task->completed = true;
+			p_task->done_semaphore.post();
+			if (do_post) {
+				p_task->group->completed.set_to(true);
+			}
+		} else {
+			if (do_post) {
+				p_task->group->done_semaphore.post();
+				p_task->group->completed.set_to(true);
+			}
+			uint32_t max_users = p_task->group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment.
+			uint32_t finished_users = p_task->group->finished.increment();
+
+			if (finished_users == max_users) {
+				// Get rid of the group, because nobody else is using it.
+				task_mutex.lock();
+				group_allocator.free(p_task->group);
+				task_mutex.unlock();
+			}
+
+			// For groups, tasks get rid of themselves.
+
+			task_mutex.lock();
+			task_allocator.free(p_task);
+			task_mutex.unlock();
+		}
+	} else {
+		if (p_task->native_func) {
+			p_task->native_func(p_task->native_func_userdata);
+		} else {
+			Callable::CallError ce;
+			Variant ret;
+			p_task->callable.call(nullptr, 0, ret, ce);
+		}
+
+		p_task->completed = true;
+		p_task->done_semaphore.post();
+	}
+
+	if (!use_native_low_priority_threads && low_priority) {
+		// A low prioriry task was freed, so see if we can move a pending one to the high priority queue.
+		bool post = false;
+		task_mutex.lock();
+		if (low_priority_task_queue.first()) {
+			Task *low_prio_task = low_priority_task_queue.first()->self();
+			low_priority_task_queue.remove(low_priority_task_queue.first());
+			task_queue.add_last(&low_prio_task->task_elem);
+			post = true;
+		} else {
+			low_priority_threads_used.decrement();
+		}
+		task_mutex.lock();
+		if (post) {
+			task_available_semaphore.post();
+		}
+	}
+}
+
+void WorkerThreadPool::_thread_function(void *p_user) {
+	while (true) {
+		singleton->task_available_semaphore.wait();
+		if (singleton->exit_threads.is_set()) {
+			break;
+		}
+		singleton->_process_task_queue();
+	}
+}
+
+void WorkerThreadPool::_native_low_priority_thread_function(void *p_user) {
+	Task *task = (Task *)p_user;
+	singleton->_process_task(task);
+}
+
+void WorkerThreadPool::_post_task(Task *p_task, bool p_high_priority) {
+	task_mutex.lock();
+	p_task->low_priority = !p_high_priority;
+	if (!p_high_priority && use_native_low_priority_threads) {
+		task_mutex.unlock();
+		p_task->low_priority_thread = native_thread_allocator.alloc();
+		p_task->low_priority_thread->start(_native_low_priority_thread_function, p_task); // Pask task directly to thread.
+
+	} else if (p_high_priority || low_priority_threads_used.get() < max_low_priority_threads) {
+		task_queue.add_last(&p_task->task_elem);
+		if (!p_high_priority) {
+			low_priority_threads_used.increment();
+		}
+		task_mutex.unlock();
+		task_available_semaphore.post();
+	} else {
+		// Too many threads using low priority, must go to queue.
+		low_priority_task_queue.add_last(&p_task->task_elem);
+		task_mutex.unlock();
+	}
+}
+
+WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *), void *p_userdata, bool p_high_priority, const String &p_description) {
+	task_mutex.lock();
+	// Get a free task
+	Task *task = task_allocator.alloc();
+	TaskID id = last_task++;
+	task->native_func = p_func;
+	task->native_func_userdata = p_userdata;
+	task->description = p_description;
+	tasks.insert(id, task);
+	task_mutex.unlock();
+
+	_post_task(task, p_high_priority);
+
+	return id;
+}
+
+WorkerThreadPool::TaskID WorkerThreadPool::add_task(const Callable &p_action, bool p_high_priority, const String &p_description) {
+	task_mutex.lock();
+	// Get a free task
+	Task *task = task_allocator.alloc();
+	TaskID id = last_task++;
+	task->callable = p_action;
+	task->description = p_description;
+	tasks.insert(id, task);
+	task_mutex.unlock();
+
+	_post_task(task, p_high_priority);
+
+	return id;
+}
+
+bool WorkerThreadPool::is_task_completed(TaskID p_task_id) const {
+	task_mutex.lock();
+	const Task *const *taskp = tasks.getptr(p_task_id);
+	if (!taskp) {
+		task_mutex.unlock();
+		ERR_FAIL_V_MSG(false, "Invalid Task ID"); // Invalid task
+	}
+
+	bool completed = (*taskp)->completed;
+	task_mutex.unlock();
+
+	return completed;
+}
+
+void WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) {
+	task_mutex.lock();
+	Task **taskp = tasks.getptr(p_task_id);
+	if (!taskp) {
+		task_mutex.unlock();
+		ERR_FAIL_MSG("Invalid Task ID"); // Invalid task
+	}
+	Task *task = *taskp;
+
+	if (task->waiting) {
+		String description = task->description;
+		task_mutex.unlock();
+		if (description.is_empty()) {
+			ERR_FAIL_MSG("Another thread is waiting on this task: " + itos(p_task_id)); // Invalid task
+		} else {
+			ERR_FAIL_MSG("Another thread is waiting on this task: " + description + " (" + itos(p_task_id) + ")"); // Invalid task
+		}
+	}
+
+	task->waiting = true;
+
+	task_mutex.unlock();
+
+	if (use_native_low_priority_threads && task->low_priority) {
+		task->low_priority_thread->wait_to_finish();
+		native_thread_allocator.free(task->low_priority_thread);
+	} else {
+		int *index = thread_ids.getptr(Thread::get_caller_id());
+
+		if (index) {
+			// We are an actual process thread, we must not be blocked so continue processing stuff if available.
+			while (true) {
+				if (task->done_semaphore.try_wait()) {
+					// If done, exit
+					break;
+				}
+				if (task_available_semaphore.try_wait()) {
+					// Solve tasks while they are around.
+					_process_task_queue();
+					continue;
+				}
+				OS::get_singleton()->delay_usec(1); // Microsleep, this could be converted to waiting for multiple objects in supported platforms for a bit more performance.
+			}
+		} else {
+			task->done_semaphore.wait();
+		}
+	}
+
+	task_mutex.lock();
+	tasks.erase(p_task_id);
+	task_allocator.free(task);
+	task_mutex.unlock();
+}
+
+WorkerThreadPool::GroupID WorkerThreadPool::add_native_group_task(void (*p_func)(void *, uint32_t), void *p_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) {
+	ERR_FAIL_COND_V(p_elements <= 0, INVALID_TASK_ID);
+	if (p_tasks < 0) {
+		p_tasks = threads.size();
+	}
+
+	task_mutex.lock();
+	Group *group = group_allocator.alloc();
+	GroupID id = last_task++;
+	group->max = p_elements;
+	group->self = id;
+	group->tasks_used = p_tasks;
+	Task **tasks_posted = (Task **)alloca(sizeof(Task *) * p_tasks);
+	for (int i = 0; i < p_tasks; i++) {
+		Task *task = task_allocator.alloc();
+		task->native_group_func = p_func;
+		task->native_func_userdata = p_userdata;
+		task->description = p_description;
+		task->group = group;
+		tasks_posted[i] = task;
+		// No task ID is used.
+	}
+	groups[id] = group;
+	task_mutex.unlock();
+
+	if (!p_high_priority && use_native_low_priority_threads) {
+		group->low_priority_native_tasks.resize(p_tasks);
+	}
+
+	for (int i = 0; i < p_tasks; i++) {
+		_post_task(tasks_posted[i], p_high_priority);
+		if (!p_high_priority && use_native_low_priority_threads) {
+			group->low_priority_native_tasks[i] = tasks_posted[i];
+		}
+	}
+
+	return id;
+}
+
+WorkerThreadPool::GroupID WorkerThreadPool::add_group_task(const Callable &p_action, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) {
+	ERR_FAIL_COND_V(p_elements <= 0, INVALID_TASK_ID);
+	if (p_tasks < 0) {
+		p_tasks = threads.size();
+	}
+
+	task_mutex.lock();
+	Group *group = group_allocator.alloc();
+	GroupID id = last_task++;
+	group->max = p_elements;
+	group->self = id;
+	group->tasks_used = p_tasks;
+	Task **tasks_posted = (Task **)alloca(sizeof(Task *) * p_tasks);
+	for (int i = 0; i < p_tasks; i++) {
+		Task *task = task_allocator.alloc();
+		task->callable = p_action;
+		task->description = p_description;
+		task->group = group;
+		tasks_posted[i] = task;
+		// No task ID is used.
+	}
+	groups[id] = group;
+	task_mutex.unlock();
+
+	if (!p_high_priority && use_native_low_priority_threads) {
+		group->low_priority_native_tasks.resize(p_tasks);
+	}
+
+	for (int i = 0; i < p_tasks; i++) {
+		_post_task(tasks_posted[i], p_high_priority);
+		if (!p_high_priority && use_native_low_priority_threads) {
+			group->low_priority_native_tasks[i] = tasks_posted[i];
+		}
+	}
+	return id;
+}
+
+bool WorkerThreadPool::is_group_task_completed(GroupID p_group) const {
+	task_mutex.lock();
+	const Group *const *groupp = groups.getptr(p_group);
+	if (!groupp) {
+		task_mutex.unlock();
+		ERR_FAIL_V_MSG(false, "Invalid Group ID");
+	}
+	bool completed = (*groupp)->completed.is_set();
+	task_mutex.unlock();
+	return completed;
+}
+
+void WorkerThreadPool::wait_for_group_task_completion(GroupID p_group) {
+	task_mutex.lock();
+	Group **groupp = groups.getptr(p_group);
+	task_mutex.unlock();
+	if (!groupp) {
+		ERR_FAIL_MSG("Invalid Group ID");
+	}
+	Group *group = *groupp;
+
+	if (group->low_priority_native_tasks.size() > 0) {
+		for (uint32_t i = 0; i < group->low_priority_native_tasks.size(); i++) {
+			group->low_priority_native_tasks[i]->low_priority_thread->wait_to_finish();
+			native_thread_allocator.free(group->low_priority_native_tasks[i]->low_priority_thread);
+			task_mutex.lock();
+			task_allocator.free(group->low_priority_native_tasks[i]);
+			task_mutex.unlock();
+		}
+
+		task_mutex.lock();
+		group_allocator.free(group);
+		task_mutex.unlock();
+	} else {
+		group->done_semaphore.wait();
+
+		uint32_t max_users = group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment.
+		uint32_t finished_users = group->finished.increment(); // fetch happens before inc, so increment later.
+
+		if (finished_users == max_users) {
+			// All tasks using this group are gone (finished before the group), so clear the gorup too.
+			task_mutex.lock();
+			group_allocator.free(group);
+			task_mutex.unlock();
+		}
+	}
+
+	groups.erase(p_group); // Threads do not access this, so safe to erase here.
+}
+
+void WorkerThreadPool::init(int p_thread_count, bool p_use_native_threads_low_priority, float p_low_priority_task_ratio) {
+	ERR_FAIL_COND(threads.size() > 0);
+	if (p_thread_count < 0) {
+		p_thread_count = OS::get_singleton()->get_default_thread_pool_size();
+	}
+
+	if (p_use_native_threads_low_priority) {
+		max_low_priority_threads = 0;
+	} else {
+		max_low_priority_threads = CLAMP(p_thread_count * p_low_priority_task_ratio, 1, p_thread_count);
+	}
+
+	use_native_low_priority_threads = p_use_native_threads_low_priority;
+
+	threads.resize(p_thread_count);
+
+	for (uint32_t i = 0; i < threads.size(); i++) {
+		threads[i].index = i;
+		threads[i].thread.start(&WorkerThreadPool::_thread_function, &threads[i]);
+		thread_ids.insert(threads[i].thread.get_id(), i);
+	}
+}
+
+void WorkerThreadPool::finish() {
+	if (threads.size() == 0) {
+		return;
+	}
+
+	task_mutex.lock();
+	SelfList<Task> *E = low_priority_task_queue.first();
+	while (E) {
+		print_error("Task waiting was never re-claimed: " + E->self()->description);
+		E = E->next();
+	}
+	task_mutex.unlock();
+
+	exit_threads.set_to(true);
+
+	for (uint32_t i = 0; i < threads.size(); i++) {
+		task_available_semaphore.post();
+	}
+
+	for (uint32_t i = 0; i < threads.size(); i++) {
+		threads[i].thread.wait_to_finish();
+	}
+
+	threads.clear();
+}
+
+void WorkerThreadPool::_bind_methods() {
+	ClassDB::bind_method(D_METHOD("add_task", "action", "high_priority", "description"), &WorkerThreadPool::add_task, DEFVAL(false), DEFVAL(String()));
+	ClassDB::bind_method(D_METHOD("is_task_completed", "task_id"), &WorkerThreadPool::is_task_completed);
+	ClassDB::bind_method(D_METHOD("wait_for_task_completion", "task_id"), &WorkerThreadPool::wait_for_task_completion);
+
+	ClassDB::bind_method(D_METHOD("add_group_task", "action", "elements", "tasks_needed", "high_priority", "description"), &WorkerThreadPool::add_group_task, DEFVAL(-1), DEFVAL(false), DEFVAL(String()));
+	ClassDB::bind_method(D_METHOD("is_group_task_completed", "group_id"), &WorkerThreadPool::is_group_task_completed);
+	ClassDB::bind_method(D_METHOD("wait_for_group_task_completion", "group_id"), &WorkerThreadPool::wait_for_group_task_completion);
+}
+
+WorkerThreadPool::WorkerThreadPool() {
+	singleton = this;
+}
+
+WorkerThreadPool::~WorkerThreadPool() {
+	finish();
+}

+ 146 - 0
core/object/worker_thread_pool.h

@@ -0,0 +1,146 @@
+/*************************************************************************/
+/*  worker_thread_pool.h                                                 */
+/*************************************************************************/
+/*                       This file is part of:                           */
+/*                           GODOT ENGINE                                */
+/*                      https://godotengine.org                          */
+/*************************************************************************/
+/* Copyright (c) 2007-2022 Juan Linietsky, Ariel Manzur.                 */
+/* Copyright (c) 2014-2022 Godot Engine contributors (cf. AUTHORS.md).   */
+/*                                                                       */
+/* Permission is hereby granted, free of charge, to any person obtaining */
+/* a copy of this software and associated documentation files (the       */
+/* "Software"), to deal in the Software without restriction, including   */
+/* without limitation the rights to use, copy, modify, merge, publish,   */
+/* distribute, sublicense, and/or sell copies of the Software, and to    */
+/* permit persons to whom the Software is furnished to do so, subject to */
+/* the following conditions:                                             */
+/*                                                                       */
+/* The above copyright notice and this permission notice shall be        */
+/* included in all copies or substantial portions of the Software.       */
+/*                                                                       */
+/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,       */
+/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF    */
+/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/
+/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY  */
+/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,  */
+/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE     */
+/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.                */
+/*************************************************************************/
+
+#ifndef WORKER_THREAD_POOL_H
+#define WORKER_THREAD_POOL_H
+
+#include "core/os/memory.h"
+#include "core/os/os.h"
+#include "core/os/semaphore.h"
+#include "core/os/thread.h"
+#include "core/templates/local_vector.h"
+#include "core/templates/paged_allocator.h"
+#include "core/templates/rid.h"
+#include "core/templates/safe_refcount.h"
+
+class WorkerThreadPool : public Object {
+	GDCLASS(WorkerThreadPool, Object)
+public:
+	enum {
+		INVALID_TASK_ID = -1
+	};
+
+	typedef int64_t TaskID;
+	typedef int64_t GroupID;
+
+private:
+	struct Task;
+
+	struct Group {
+		GroupID self;
+		SafeNumeric<uint32_t> index;
+		uint32_t max = 0;
+		Semaphore done_semaphore;
+		SafeFlag completed;
+		SafeNumeric<uint32_t> finished;
+		uint32_t tasks_used = 0;
+		TightLocalVector<Task *> low_priority_native_tasks;
+	};
+
+	struct Task {
+		Callable callable;
+		void (*native_func)(void *) = nullptr;
+		void (*native_group_func)(void *, uint32_t) = nullptr;
+		void *native_func_userdata = nullptr;
+		String description;
+		Semaphore done_semaphore;
+		bool completed = false;
+		Group *group = nullptr;
+		SelfList<Task> task_elem;
+		bool waiting = false; // Waiting for completion
+		bool low_priority = false;
+		Thread *low_priority_thread = nullptr;
+		Task() :
+				task_elem(this) {}
+	};
+
+	PagedAllocator<Task> task_allocator;
+	PagedAllocator<Group> group_allocator;
+	PagedAllocator<Thread> native_thread_allocator;
+
+	SelfList<Task>::List low_priority_task_queue;
+	SelfList<Task>::List task_queue;
+
+	Mutex task_mutex;
+	Semaphore task_available_semaphore;
+
+	struct ThreadData {
+		uint32_t index;
+		Thread thread;
+	};
+
+	TightLocalVector<ThreadData> threads;
+	SafeFlag exit_threads;
+
+	HashMap<Thread::ID, int> thread_ids;
+	HashMap<TaskID, Task *> tasks;
+	HashMap<GroupID, Group *> groups;
+
+	bool use_native_low_priority_threads = false;
+	uint32_t max_low_priority_threads = 0;
+	SafeNumeric<uint32_t> low_priority_threads_used;
+
+	uint64_t last_task = 1;
+
+	static void _thread_function(void *p_user);
+	static void _native_low_priority_thread_function(void *p_user);
+
+	void _process_task_queue();
+	void _process_task(Task *task);
+
+	void _post_task(Task *p_task, bool p_high_priority);
+
+	static WorkerThreadPool *singleton;
+
+protected:
+	static void _bind_methods();
+
+public:
+	TaskID add_native_task(void (*p_func)(void *), void *p_userdata, bool p_high_priority = false, const String &p_description = String());
+	TaskID add_task(const Callable &p_action, bool p_high_priority = false, const String &p_description = String());
+
+	bool is_task_completed(TaskID p_task_id) const;
+	void wait_for_task_completion(TaskID p_task_id);
+
+	GroupID add_native_group_task(void (*p_func)(void *, uint32_t), void *p_userdata, int p_elements, int p_tasks = -1, bool p_high_priority = false, const String &p_description = String());
+	GroupID add_group_task(const Callable &p_action, int p_elements, int p_tasks = -1, bool p_high_priority = false, const String &p_description = String());
+	bool is_group_task_completed(GroupID p_group) const;
+	void wait_for_group_task_completion(GroupID p_group);
+
+	_FORCE_INLINE_ int get_thread_count() const { return threads.size(); }
+
+	static WorkerThreadPool *get_singleton() { return singleton; }
+	void init(int p_thread_count = -1, bool p_use_native_threads_low_priority = true, float p_low_priority_task_ratio = 0.3);
+	void finish();
+	WorkerThreadPool();
+	~WorkerThreadPool();
+};
+
+#endif // WORKER_THREAD_POOL_H

+ 20 - 1
core/register_core_types.cpp

@@ -74,6 +74,7 @@
 #include "core/object/class_db.h"
 #include "core/object/script_language_extension.h"
 #include "core/object/undo_redo.h"
+#include "core/object/worker_thread_pool.h"
 #include "core/os/main_loop.h"
 #include "core/os/time.h"
 #include "core/string/optimized_translation.h"
@@ -101,6 +102,8 @@ static IP *ip = nullptr;
 static core_bind::Geometry2D *_geometry_2d = nullptr;
 static core_bind::Geometry3D *_geometry_3d = nullptr;
 
+static WorkerThreadPool *worker_thread_pool = nullptr;
+
 extern Mutex _global_mutex;
 
 static NativeExtensionManager *native_extension_manager = nullptr;
@@ -189,6 +192,8 @@ void register_core_types() {
 	GDREGISTER_CLASS(PacketPeerUDP);
 	GDREGISTER_CLASS(UDPServer);
 
+	GDREGISTER_ABSTRACT_CLASS(WorkerThreadPool);
+
 	ClassDB::register_custom_instance_class<HTTPClient>();
 
 	// Crypto
@@ -271,6 +276,8 @@ void register_core_types() {
 
 	GDREGISTER_NATIVE_STRUCT(AudioFrame, "float left;float right");
 	GDREGISTER_NATIVE_STRUCT(ScriptLanguageExtensionProfilingInfo, "StringName signature;uint64_t call_count;uint64_t total_time;uint64_t self_time");
+
+	worker_thread_pool = memnew(WorkerThreadPool);
 }
 
 void register_core_settings() {
@@ -279,9 +286,18 @@ void register_core_settings() {
 	ProjectSettings::get_singleton()->set_custom_property_info("network/limits/tcp/connect_timeout_seconds", PropertyInfo(Variant::INT, "network/limits/tcp/connect_timeout_seconds", PROPERTY_HINT_RANGE, "1,1800,1"));
 	GLOBAL_DEF_RST("network/limits/packet_peer_stream/max_buffer_po2", (16));
 	ProjectSettings::get_singleton()->set_custom_property_info("network/limits/packet_peer_stream/max_buffer_po2", PropertyInfo(Variant::INT, "network/limits/packet_peer_stream/max_buffer_po2", PROPERTY_HINT_RANGE, "0,64,1,or_greater"));
-
 	GLOBAL_DEF("network/ssl/certificate_bundle_override", "");
 	ProjectSettings::get_singleton()->set_custom_property_info("network/ssl/certificate_bundle_override", PropertyInfo(Variant::STRING, "network/ssl/certificate_bundle_override", PROPERTY_HINT_FILE, "*.crt"));
+
+	int worker_threads = GLOBAL_DEF("threading/worker_pool/max_threads", -1);
+	bool low_priority_use_system_threads = GLOBAL_DEF("threading/worker_pool/use_system_threads_for_low_priority_tasks", true);
+	float low_property_ratio = GLOBAL_DEF("threading/worker_pool/low_priority_thread_ratio", 0.3);
+
+	if (Engine::get_singleton()->is_editor_hint() || Engine::get_singleton()->is_project_manager_hint()) {
+		worker_thread_pool->init();
+	} else {
+		worker_thread_pool->init(worker_threads, low_priority_use_system_threads, low_property_ratio);
+	}
 }
 
 void register_core_singletons() {
@@ -319,6 +335,7 @@ void register_core_singletons() {
 	Engine::get_singleton()->add_singleton(Engine::Singleton("Time", Time::get_singleton()));
 	Engine::get_singleton()->add_singleton(Engine::Singleton("NativeExtensionManager", NativeExtensionManager::get_singleton()));
 	Engine::get_singleton()->add_singleton(Engine::Singleton("ResourceUID", ResourceUID::get_singleton()));
+	Engine::get_singleton()->add_singleton(Engine::Singleton("WorkerThreadPool", worker_thread_pool));
 }
 
 void register_core_extensions() {
@@ -350,6 +367,8 @@ void unregister_core_types() {
 	memdelete(_geometry_2d);
 	memdelete(_geometry_3d);
 
+	memdelete(worker_thread_pool);
+
 	ResourceLoader::remove_resource_format_loader(resource_format_image);
 	resource_format_image.unref();
 

+ 1 - 0
core/templates/safe_refcount.h

@@ -111,6 +111,7 @@ public:
 			if (tmp >= p_value) {
 				return tmp; // already greater, or equal
 			}
+
 			if (value.compare_exchange_weak(tmp, p_value, std::memory_order_acq_rel)) {
 				return p_value;
 			}

+ 2 - 0
doc/classes/@GlobalScope.xml

@@ -1222,6 +1222,8 @@
 		<member name="VisualScriptCustomNodes" type="VisualScriptCustomNodes" setter="" getter="">
 			The [VisualScriptCustomNodes] singleton.
 		</member>
+		<member name="WorkerThreadPool" type="WorkerThreadPool" setter="" getter="">
+		</member>
 		<member name="XRServer" type="XRServer" setter="" getter="">
 			The [XRServer] singleton.
 		</member>

+ 6 - 0
doc/classes/ProjectSettings.xml

@@ -1986,6 +1986,12 @@
 		</member>
 		<member name="rendering/vulkan/staging_buffer/texture_upload_region_size_px" type="int" setter="" getter="" default="64">
 		</member>
+		<member name="threading/worker_pool/low_priority_thread_ratio" type="float" setter="" getter="" default="0.3">
+		</member>
+		<member name="threading/worker_pool/max_threads" type="int" setter="" getter="" default="-1">
+		</member>
+		<member name="threading/worker_pool/use_system_threads_for_low_priority_tasks" type="bool" setter="" getter="" default="true">
+		</member>
 		<member name="xr/openxr/default_action_map" type="String" setter="" getter="" default="&quot;res://openxr_action_map.tres&quot;">
 			Action map configuration to load by default.
 		</member>

+ 53 - 0
doc/classes/WorkerThreadPool.xml

@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<class name="WorkerThreadPool" inherits="Object" version="4.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="../class.xsd">
+	<brief_description>
+	</brief_description>
+	<description>
+	</description>
+	<tutorials>
+	</tutorials>
+	<methods>
+		<method name="add_group_task">
+			<return type="int" />
+			<argument index="0" name="action" type="Callable" />
+			<argument index="1" name="elements" type="int" />
+			<argument index="2" name="tasks_needed" type="int" default="-1" />
+			<argument index="3" name="high_priority" type="bool" default="false" />
+			<argument index="4" name="description" type="String" default="&quot;&quot;" />
+			<description>
+			</description>
+		</method>
+		<method name="add_task">
+			<return type="int" />
+			<argument index="0" name="action" type="Callable" />
+			<argument index="1" name="high_priority" type="bool" default="false" />
+			<argument index="2" name="description" type="String" default="&quot;&quot;" />
+			<description>
+			</description>
+		</method>
+		<method name="is_group_task_completed" qualifiers="const">
+			<return type="bool" />
+			<argument index="0" name="group_id" type="int" />
+			<description>
+			</description>
+		</method>
+		<method name="is_task_completed" qualifiers="const">
+			<return type="bool" />
+			<argument index="0" name="task_id" type="int" />
+			<description>
+			</description>
+		</method>
+		<method name="wait_for_group_task_completion">
+			<return type="void" />
+			<argument index="0" name="group_id" type="int" />
+			<description>
+			</description>
+		</method>
+		<method name="wait_for_task_completion">
+			<return type="void" />
+			<argument index="0" name="task_id" type="int" />
+			<description>
+			</description>
+		</method>
+	</methods>
+</class>

+ 2 - 0
main/main.cpp

@@ -410,6 +410,8 @@ Error Main::test_setup() {
 			String("Please include this when reporting the bug on https://github.com/godotengine/godot/issues"));
 	GLOBAL_DEF_RST("rendering/occlusion_culling/bvh_build_quality", 2);
 
+	register_core_settings(); //here globals are present
+
 	translation_server = memnew(TranslationServer);
 	tsman = memnew(TextServerManager);
 

+ 158 - 0
tests/core/threads/test_worker_thread_pool.h

@@ -0,0 +1,158 @@
+/*************************************************************************/
+/*  test_worker_thread_pool.h                                            */
+/*************************************************************************/
+/*                       This file is part of:                           */
+/*                           GODOT ENGINE                                */
+/*                      https://godotengine.org                          */
+/*************************************************************************/
+/* Copyright (c) 2007-2022 Juan Linietsky, Ariel Manzur.                 */
+/* Copyright (c) 2014-2022 Godot Engine contributors (cf. AUTHORS.md).   */
+/*                                                                       */
+/* Permission is hereby granted, free of charge, to any person obtaining */
+/* a copy of this software and associated documentation files (the       */
+/* "Software"), to deal in the Software without restriction, including   */
+/* without limitation the rights to use, copy, modify, merge, publish,   */
+/* distribute, sublicense, and/or sell copies of the Software, and to    */
+/* permit persons to whom the Software is furnished to do so, subject to */
+/* the following conditions:                                             */
+/*                                                                       */
+/* The above copyright notice and this permission notice shall be        */
+/* included in all copies or substantial portions of the Software.       */
+/*                                                                       */
+/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,       */
+/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF    */
+/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/
+/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY  */
+/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,  */
+/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE     */
+/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.                */
+/*************************************************************************/
+
+#ifndef TEST_WORKER_THREAD_POOL_H
+#define TEST_WORKER_THREAD_POOL_H
+
+#include "core/object/worker_thread_pool.h"
+
+#include "tests/test_macros.h"
+
+namespace TestWorkerThreadPool {
+
+int u32scmp(const char32_t *l, const char32_t *r) {
+	for (; *l == *r && *l && *r; l++, r++) {
+		// Continue.
+	}
+	return *l - *r;
+}
+
+static void static_test(void *p_arg) {
+	SafeNumeric<uint32_t> *counter = (SafeNumeric<uint32_t> *)p_arg;
+	counter->increment();
+}
+
+static SafeNumeric<uint32_t> callable_counter;
+
+static void static_callable_test() {
+	callable_counter.increment();
+}
+
+TEST_CASE("[WorkerThreadPool] Process 256 threads using native task") {
+	const int count = 256;
+	SafeNumeric<uint32_t> counter;
+	WorkerThreadPool::TaskID tasks[count];
+	for (int i = 0; i < count; i++) {
+		tasks[i] = WorkerThreadPool::get_singleton()->add_native_task(static_test, &counter, true);
+	}
+	for (int i = 0; i < count; i++) {
+		WorkerThreadPool::get_singleton()->wait_for_task_completion(tasks[i]);
+	}
+
+	CHECK(counter.get() == count);
+}
+
+TEST_CASE("[WorkerThreadPool] Process 256 threads using native low priority") {
+	const int count = 256;
+	SafeNumeric<uint32_t> counter = SafeNumeric<uint32_t>(0);
+	WorkerThreadPool::TaskID tasks[count];
+	for (int i = 0; i < count; i++) {
+		tasks[i] = WorkerThreadPool::get_singleton()->add_native_task(static_test, &counter, false);
+	}
+	for (int i = 0; i < count; i++) {
+		WorkerThreadPool::get_singleton()->wait_for_task_completion(tasks[i]);
+	}
+
+	CHECK(counter.get() == count);
+}
+
+TEST_CASE("[WorkerThreadPool] Process 256 threads using callable") {
+	const int count = 256;
+	WorkerThreadPool::TaskID tasks[count];
+	callable_counter.set(0);
+	for (int i = 0; i < count; i++) {
+		tasks[i] = WorkerThreadPool::get_singleton()->add_task(callable_mp_static(static_callable_test), true);
+	}
+	for (int i = 0; i < count; i++) {
+		WorkerThreadPool::get_singleton()->wait_for_task_completion(tasks[i]);
+	}
+
+	CHECK(callable_counter.get() == count);
+}
+
+TEST_CASE("[WorkerThreadPool] Process 256 threads using callable low priority") {
+	const int count = 256;
+	WorkerThreadPool::TaskID tasks[count];
+	callable_counter.set(0);
+	for (int i = 0; i < count; i++) {
+		tasks[i] = WorkerThreadPool::get_singleton()->add_task(callable_mp_static(static_callable_test), false);
+	}
+	for (int i = 0; i < count; i++) {
+		WorkerThreadPool::get_singleton()->wait_for_task_completion(tasks[i]);
+	}
+
+	CHECK(callable_counter.get() == count);
+}
+
+static void static_group_test(void *p_arg, uint32_t p_index) {
+	SafeNumeric<uint32_t> *counter = (SafeNumeric<uint32_t> *)p_arg;
+	counter->exchange_if_greater(p_index);
+}
+
+TEST_CASE("[WorkerThreadPool] Process 256 elements on native task group") {
+	const int count = 256;
+	SafeNumeric<uint32_t> counter;
+	WorkerThreadPool::GroupID group = WorkerThreadPool::get_singleton()->add_native_group_task(static_group_test, &counter, count, -1, true);
+	WorkerThreadPool::get_singleton()->wait_for_group_task_completion(group);
+	CHECK(counter.get() == count - 1);
+}
+
+TEST_CASE("[WorkerThreadPool] Process 256 elements on native task group low priority") {
+	const int count = 256;
+	SafeNumeric<uint32_t> counter;
+	WorkerThreadPool::GroupID group = WorkerThreadPool::get_singleton()->add_native_group_task(static_group_test, &counter, count, -1, false);
+	WorkerThreadPool::get_singleton()->wait_for_group_task_completion(group);
+	CHECK(counter.get() == count - 1);
+}
+
+static SafeNumeric<uint32_t> callable_group_counter;
+
+static void static_callable_group_test(uint32_t p_index) {
+	callable_group_counter.exchange_if_greater(p_index);
+}
+
+TEST_CASE("[WorkerThreadPool] Process 256 elements on native task group") {
+	const int count = 256;
+	WorkerThreadPool::GroupID group = WorkerThreadPool::get_singleton()->add_group_task(callable_mp_static(static_callable_group_test), count, -1, true);
+	WorkerThreadPool::get_singleton()->wait_for_group_task_completion(group);
+	CHECK(callable_group_counter.get() == count - 1);
+}
+
+TEST_CASE("[WorkerThreadPool] Process 256 elements on native task group low priority") {
+	const int count = 256;
+	callable_group_counter.set(0);
+	WorkerThreadPool::GroupID group = WorkerThreadPool::get_singleton()->add_group_task(callable_mp_static(static_callable_group_test), count, -1, false);
+	WorkerThreadPool::get_singleton()->wait_for_group_task_completion(group);
+	CHECK(callable_group_counter.get() == count - 1);
+}
+
+} // namespace TestWorkerThreadPool
+
+#endif // TEST_WORKER_THREAD_POOL_H

+ 1 - 0
tests/test_main.cpp

@@ -70,6 +70,7 @@
 #include "tests/core/test_crypto.h"
 #include "tests/core/test_hashing_context.h"
 #include "tests/core/test_time.h"
+#include "tests/core/threads/test_worker_thread_pool.h"
 #include "tests/core/variant/test_array.h"
 #include "tests/core/variant/test_dictionary.h"
 #include "tests/core/variant/test_variant.h"