|
@@ -32,6 +32,13 @@
|
|
|
|
|
|
#include "core/os/os.h"
|
|
|
|
|
|
+void WorkerThreadPool::Task::free_template_userdata() {
|
|
|
+ ERR_FAIL_COND(!template_userdata);
|
|
|
+ ERR_FAIL_COND(native_func_userdata == nullptr);
|
|
|
+ BaseTemplateUserdata *btu = (BaseTemplateUserdata *)native_func_userdata;
|
|
|
+ memdelete(btu);
|
|
|
+}
|
|
|
+
|
|
|
WorkerThreadPool *WorkerThreadPool::singleton = nullptr;
|
|
|
|
|
|
void WorkerThreadPool::_process_task_queue() {
|
|
@@ -48,30 +55,36 @@ void WorkerThreadPool::_process_task(Task *p_task) {
|
|
|
if (p_task->group) {
|
|
|
// Handling a group
|
|
|
bool do_post = false;
|
|
|
- if (p_task->native_group_func) {
|
|
|
- while (true) {
|
|
|
- uint32_t work_index = p_task->group->index.postincrement();
|
|
|
- if (work_index >= p_task->group->max) {
|
|
|
- do_post = work_index == p_task->group->max; // First one reaching max handles semaphore and clean-up.
|
|
|
- break;
|
|
|
- }
|
|
|
- p_task->native_group_func(p_task->native_func_userdata, work_index);
|
|
|
- }
|
|
|
+ Callable::CallError ce;
|
|
|
+ Variant ret;
|
|
|
+ Variant arg;
|
|
|
+ Variant *argptr = &arg;
|
|
|
|
|
|
- } else {
|
|
|
- Callable::CallError ce;
|
|
|
- Variant ret;
|
|
|
- Variant arg;
|
|
|
- Variant *argptr = &arg;
|
|
|
- while (true) {
|
|
|
- uint32_t work_index = p_task->group->index.postincrement();
|
|
|
- if (work_index >= p_task->group->max) {
|
|
|
- do_post = work_index == p_task->group->max; // First one reaching max handles semaphore and clean-up.
|
|
|
- break;
|
|
|
- }
|
|
|
+ while (true) {
|
|
|
+ uint32_t work_index = p_task->group->index.postincrement();
|
|
|
+
|
|
|
+ if (work_index >= p_task->group->max) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ if (p_task->native_group_func) {
|
|
|
+ p_task->native_group_func(p_task->native_func_userdata, work_index);
|
|
|
+ } else if (p_task->template_userdata) {
|
|
|
+ p_task->template_userdata->callback_indexed(work_index);
|
|
|
+ } else {
|
|
|
arg = work_index;
|
|
|
p_task->callable.call((const Variant **)&argptr, 1, ret, ce);
|
|
|
}
|
|
|
+
|
|
|
+ // This is the only way to ensure posting is done when all tasks are really complete.
|
|
|
+ uint32_t completed_amount = p_task->group->completed_index.increment();
|
|
|
+
|
|
|
+ if (completed_amount == p_task->group->max) {
|
|
|
+ do_post = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (do_post && p_task->template_userdata) {
|
|
|
+ memdelete(p_task->template_userdata); // This is no longer needed at this point, so get rid of it.
|
|
|
}
|
|
|
|
|
|
if (low_priority && use_native_low_priority_threads) {
|
|
@@ -104,6 +117,9 @@ void WorkerThreadPool::_process_task(Task *p_task) {
|
|
|
} else {
|
|
|
if (p_task->native_func) {
|
|
|
p_task->native_func(p_task->native_func_userdata);
|
|
|
+ } else if (p_task->template_userdata) {
|
|
|
+ p_task->template_userdata->callback();
|
|
|
+ memdelete(p_task->template_userdata);
|
|
|
} else {
|
|
|
Callable::CallError ce;
|
|
|
Variant ret;
|
|
@@ -171,13 +187,19 @@ void WorkerThreadPool::_post_task(Task *p_task, bool p_high_priority) {
|
|
|
}
|
|
|
|
|
|
WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *), void *p_userdata, bool p_high_priority, const String &p_description) {
|
|
|
+ return _add_task(Callable(), p_func, p_userdata, nullptr, p_high_priority, p_description);
|
|
|
+}
|
|
|
+
|
|
|
+WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description) {
|
|
|
task_mutex.lock();
|
|
|
// Get a free task
|
|
|
Task *task = task_allocator.alloc();
|
|
|
TaskID id = last_task++;
|
|
|
+ task->callable = p_callable;
|
|
|
task->native_func = p_func;
|
|
|
task->native_func_userdata = p_userdata;
|
|
|
task->description = p_description;
|
|
|
+ task->template_userdata = p_template_userdata;
|
|
|
tasks.insert(id, task);
|
|
|
task_mutex.unlock();
|
|
|
|
|
@@ -187,18 +209,7 @@ WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *
|
|
|
}
|
|
|
|
|
|
WorkerThreadPool::TaskID WorkerThreadPool::add_task(const Callable &p_action, bool p_high_priority, const String &p_description) {
|
|
|
- task_mutex.lock();
|
|
|
- // Get a free task
|
|
|
- Task *task = task_allocator.alloc();
|
|
|
- TaskID id = last_task++;
|
|
|
- task->callable = p_action;
|
|
|
- task->description = p_description;
|
|
|
- tasks.insert(id, task);
|
|
|
- task_mutex.unlock();
|
|
|
-
|
|
|
- _post_task(task, p_high_priority);
|
|
|
-
|
|
|
- return id;
|
|
|
+ return _add_task(p_action, nullptr, nullptr, nullptr, p_high_priority, p_description);
|
|
|
}
|
|
|
|
|
|
bool WorkerThreadPool::is_task_completed(TaskID p_task_id) const {
|
|
@@ -269,8 +280,8 @@ void WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) {
|
|
|
task_mutex.unlock();
|
|
|
}
|
|
|
|
|
|
-WorkerThreadPool::GroupID WorkerThreadPool::add_native_group_task(void (*p_func)(void *, uint32_t), void *p_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) {
|
|
|
- ERR_FAIL_COND_V(p_elements <= 0, INVALID_TASK_ID);
|
|
|
+WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) {
|
|
|
+ ERR_FAIL_COND_V(p_elements < 0, INVALID_TASK_ID);
|
|
|
if (p_tasks < 0) {
|
|
|
p_tasks = threads.size();
|
|
|
}
|
|
@@ -280,17 +291,34 @@ WorkerThreadPool::GroupID WorkerThreadPool::add_native_group_task(void (*p_func)
|
|
|
GroupID id = last_task++;
|
|
|
group->max = p_elements;
|
|
|
group->self = id;
|
|
|
- group->tasks_used = p_tasks;
|
|
|
- Task **tasks_posted = (Task **)alloca(sizeof(Task *) * p_tasks);
|
|
|
- for (int i = 0; i < p_tasks; i++) {
|
|
|
- Task *task = task_allocator.alloc();
|
|
|
- task->native_group_func = p_func;
|
|
|
- task->native_func_userdata = p_userdata;
|
|
|
- task->description = p_description;
|
|
|
- task->group = group;
|
|
|
- tasks_posted[i] = task;
|
|
|
- // No task ID is used.
|
|
|
+
|
|
|
+ Task **tasks_posted = nullptr;
|
|
|
+ if (p_elements == 0) {
|
|
|
+ // Should really not call it with zero Elements, but at least it should work.
|
|
|
+ group->completed.set_to(true);
|
|
|
+ group->done_semaphore.post();
|
|
|
+ group->tasks_used = 0;
|
|
|
+ p_tasks = 0;
|
|
|
+ if (p_template_userdata) {
|
|
|
+ memdelete(p_template_userdata);
|
|
|
+ }
|
|
|
+
|
|
|
+ } else {
|
|
|
+ group->tasks_used = p_tasks;
|
|
|
+ tasks_posted = (Task **)alloca(sizeof(Task *) * p_tasks);
|
|
|
+ for (int i = 0; i < p_tasks; i++) {
|
|
|
+ Task *task = task_allocator.alloc();
|
|
|
+ task->native_group_func = p_func;
|
|
|
+ task->native_func_userdata = p_userdata;
|
|
|
+ task->description = p_description;
|
|
|
+ task->group = group;
|
|
|
+ task->callable = p_callable;
|
|
|
+ task->template_userdata = p_template_userdata;
|
|
|
+ tasks_posted[i] = task;
|
|
|
+ // No task ID is used.
|
|
|
+ }
|
|
|
}
|
|
|
+
|
|
|
groups[id] = group;
|
|
|
task_mutex.unlock();
|
|
|
|
|
@@ -308,43 +336,25 @@ WorkerThreadPool::GroupID WorkerThreadPool::add_native_group_task(void (*p_func)
|
|
|
return id;
|
|
|
}
|
|
|
|
|
|
+WorkerThreadPool::GroupID WorkerThreadPool::add_native_group_task(void (*p_func)(void *, uint32_t), void *p_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) {
|
|
|
+ return _add_group_task(Callable(), p_func, p_userdata, nullptr, p_elements, p_tasks, p_high_priority, p_description);
|
|
|
+}
|
|
|
+
|
|
|
WorkerThreadPool::GroupID WorkerThreadPool::add_group_task(const Callable &p_action, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) {
|
|
|
- ERR_FAIL_COND_V(p_elements <= 0, INVALID_TASK_ID);
|
|
|
- if (p_tasks < 0) {
|
|
|
- p_tasks = threads.size();
|
|
|
- }
|
|
|
+ return _add_group_task(p_action, nullptr, nullptr, nullptr, p_elements, p_tasks, p_high_priority, p_description);
|
|
|
+}
|
|
|
|
|
|
+uint32_t WorkerThreadPool::get_group_processed_element_count(GroupID p_group) const {
|
|
|
task_mutex.lock();
|
|
|
- Group *group = group_allocator.alloc();
|
|
|
- GroupID id = last_task++;
|
|
|
- group->max = p_elements;
|
|
|
- group->self = id;
|
|
|
- group->tasks_used = p_tasks;
|
|
|
- Task **tasks_posted = (Task **)alloca(sizeof(Task *) * p_tasks);
|
|
|
- for (int i = 0; i < p_tasks; i++) {
|
|
|
- Task *task = task_allocator.alloc();
|
|
|
- task->callable = p_action;
|
|
|
- task->description = p_description;
|
|
|
- task->group = group;
|
|
|
- tasks_posted[i] = task;
|
|
|
- // No task ID is used.
|
|
|
+ const Group *const *groupp = groups.getptr(p_group);
|
|
|
+ if (!groupp) {
|
|
|
+ task_mutex.unlock();
|
|
|
+ ERR_FAIL_V_MSG(0, "Invalid Group ID");
|
|
|
}
|
|
|
- groups[id] = group;
|
|
|
+ uint32_t elements = (*groupp)->completed_index.get();
|
|
|
task_mutex.unlock();
|
|
|
-
|
|
|
- if (!p_high_priority && use_native_low_priority_threads) {
|
|
|
- group->low_priority_native_tasks.resize(p_tasks);
|
|
|
- }
|
|
|
-
|
|
|
- for (int i = 0; i < p_tasks; i++) {
|
|
|
- _post_task(tasks_posted[i], p_high_priority);
|
|
|
- if (!p_high_priority && use_native_low_priority_threads) {
|
|
|
- group->low_priority_native_tasks[i] = tasks_posted[i];
|
|
|
- }
|
|
|
- }
|
|
|
- return id;
|
|
|
+ return elements;
|
|
|
}
|
|
|
-
|
|
|
bool WorkerThreadPool::is_group_task_completed(GroupID p_group) const {
|
|
|
task_mutex.lock();
|
|
|
const Group *const *groupp = groups.getptr(p_group);
|
|
@@ -451,6 +461,7 @@ void WorkerThreadPool::_bind_methods() {
|
|
|
|
|
|
ClassDB::bind_method(D_METHOD("add_group_task", "action", "elements", "tasks_needed", "high_priority", "description"), &WorkerThreadPool::add_group_task, DEFVAL(-1), DEFVAL(false), DEFVAL(String()));
|
|
|
ClassDB::bind_method(D_METHOD("is_group_task_completed", "group_id"), &WorkerThreadPool::is_group_task_completed);
|
|
|
+ ClassDB::bind_method(D_METHOD("get_group_processed_element_count", "group_id"), &WorkerThreadPool::get_group_processed_element_count);
|
|
|
ClassDB::bind_method(D_METHOD("wait_for_group_task_completion", "group_id"), &WorkerThreadPool::wait_for_group_task_completion);
|
|
|
}
|
|
|
|