|
@@ -32,7 +32,7 @@ public:
|
|
|
, m_hive(hive)
|
|
, m_hive(hive)
|
|
|
{
|
|
{
|
|
|
ANKI_ASSERT(hive);
|
|
ANKI_ASSERT(hive);
|
|
|
- m_thread.start(this, threadCallback, (pinToCores) ? m_id : -1);
|
|
|
|
|
|
|
+ m_thread.start(this, threadCallback, (pinToCores) ? I(m_id) : -1);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
private:
|
|
private:
|
|
@@ -46,7 +46,7 @@ private:
|
|
|
}
|
|
}
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
-class ThreadHive::Task
|
|
|
|
|
|
|
+class ThreadHive::Task : public NonCopyable
|
|
|
{
|
|
{
|
|
|
public:
|
|
public:
|
|
|
Task* m_next; ///< Next in the list.
|
|
Task* m_next; ///< Next in the list.
|
|
@@ -54,21 +54,8 @@ public:
|
|
|
ThreadHiveTaskCallback m_cb; ///< Callback that defines the task.
|
|
ThreadHiveTaskCallback m_cb; ///< Callback that defines the task.
|
|
|
void* m_arg; ///< Args for the callback.
|
|
void* m_arg; ///< Args for the callback.
|
|
|
|
|
|
|
|
- Task** m_deps;
|
|
|
|
|
- U16 m_depCount;
|
|
|
|
|
- Bool8 m_othersDepend; ///< Other tasks depend on this one.
|
|
|
|
|
-
|
|
|
|
|
- Task()
|
|
|
|
|
- {
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- Task(const Task& b) = delete;
|
|
|
|
|
- Task& operator=(const Task& b) = delete;
|
|
|
|
|
-
|
|
|
|
|
- Bool done() const
|
|
|
|
|
- {
|
|
|
|
|
- return m_cb == nullptr;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ ThreadHiveSemaphore* m_waitSemaphore;
|
|
|
|
|
+ ThreadHiveSemaphore* m_signalSemaphore;
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
ThreadHive::ThreadHive(U threadCount, GenericMemoryPoolAllocator<U8> alloc, Bool pinToCores)
|
|
ThreadHive::ThreadHive(U threadCount, GenericMemoryPoolAllocator<U8> alloc, Bool pinToCores)
|
|
@@ -117,47 +104,17 @@ void ThreadHive::submitTasks(ThreadHiveTask* tasks, const U taskCount)
|
|
|
// Allocate tasks
|
|
// Allocate tasks
|
|
|
Task* const htasks = m_alloc.newArray<Task>(taskCount);
|
|
Task* const htasks = m_alloc.newArray<Task>(taskCount);
|
|
|
|
|
|
|
|
- // Allocate the dependency handles
|
|
|
|
|
- U depCount = 0;
|
|
|
|
|
- for(U i = 0; i < taskCount; ++i)
|
|
|
|
|
- {
|
|
|
|
|
- depCount += tasks[i].m_inDependencies.getSize();
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- Task** depHandles;
|
|
|
|
|
- if(depCount)
|
|
|
|
|
- {
|
|
|
|
|
- depHandles = m_alloc.newArray<Task*>(depCount);
|
|
|
|
|
- }
|
|
|
|
|
- else
|
|
|
|
|
- {
|
|
|
|
|
- depHandles = nullptr;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- depCount = 0;
|
|
|
|
|
-
|
|
|
|
|
// Initialize tasks
|
|
// Initialize tasks
|
|
|
for(U i = 0; i < taskCount; ++i)
|
|
for(U i = 0; i < taskCount; ++i)
|
|
|
{
|
|
{
|
|
|
const ThreadHiveTask& inTask = tasks[i];
|
|
const ThreadHiveTask& inTask = tasks[i];
|
|
|
Task& outTask = htasks[i];
|
|
Task& outTask = htasks[i];
|
|
|
|
|
|
|
|
|
|
+ outTask.m_next = nullptr;
|
|
|
outTask.m_cb = inTask.m_callback;
|
|
outTask.m_cb = inTask.m_callback;
|
|
|
outTask.m_arg = inTask.m_argument;
|
|
outTask.m_arg = inTask.m_argument;
|
|
|
- outTask.m_depCount = 0;
|
|
|
|
|
- outTask.m_next = nullptr;
|
|
|
|
|
- outTask.m_othersDepend = false;
|
|
|
|
|
-
|
|
|
|
|
- // Set the dependencies
|
|
|
|
|
- if(inTask.m_inDependencies.getSize() > 0)
|
|
|
|
|
- {
|
|
|
|
|
- outTask.m_deps = &depHandles[depCount];
|
|
|
|
|
- depCount += inTask.m_inDependencies.getSize();
|
|
|
|
|
- }
|
|
|
|
|
- else
|
|
|
|
|
- {
|
|
|
|
|
- outTask.m_deps = nullptr;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ outTask.m_waitSemaphore = inTask.m_waitSemaphore;
|
|
|
|
|
+ outTask.m_signalSemaphore = inTask.m_signalSemaphore;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Push work
|
|
// Push work
|
|
@@ -166,22 +123,8 @@ void ThreadHive::submitTasks(ThreadHiveTask* tasks, const U taskCount)
|
|
|
|
|
|
|
|
for(U i = 0; i < taskCount; ++i)
|
|
for(U i = 0; i < taskCount; ++i)
|
|
|
{
|
|
{
|
|
|
- const ThreadHiveTask& inTask = tasks[i];
|
|
|
|
|
Task& outTask = htasks[i];
|
|
Task& outTask = htasks[i];
|
|
|
|
|
|
|
|
- for(U j = 0; j < inTask.m_inDependencies.getSize(); ++j)
|
|
|
|
|
- {
|
|
|
|
|
- ThreadHiveDependencyHandle dep = inTask.m_inDependencies[j];
|
|
|
|
|
- ANKI_ASSERT(dep);
|
|
|
|
|
- Task* depTask = static_cast<Task*>(dep);
|
|
|
|
|
-
|
|
|
|
|
- if(!depTask->done())
|
|
|
|
|
- {
|
|
|
|
|
- outTask.m_deps[outTask.m_depCount++] = depTask;
|
|
|
|
|
- depTask->m_othersDepend = true;
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
// Push to the list
|
|
// Push to the list
|
|
|
ANKI_HIVE_DEBUG_PRINT(
|
|
ANKI_HIVE_DEBUG_PRINT(
|
|
|
"pushing back %p (udata %p)\n", static_cast<void*>(&outTask), static_cast<void*>(outTask.m_arg));
|
|
"pushing back %p (udata %p)\n", static_cast<void*>(&outTask), static_cast<void*>(outTask.m_arg));
|
|
@@ -205,12 +148,6 @@ void ThreadHive::submitTasks(ThreadHiveTask* tasks, const U taskCount)
|
|
|
// Notify all threads
|
|
// Notify all threads
|
|
|
m_cvar.notifyAll();
|
|
m_cvar.notifyAll();
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
- // Set the out dependencies
|
|
|
|
|
- for(U i = 0; i < taskCount; ++i)
|
|
|
|
|
- {
|
|
|
|
|
- tasks[i].m_outDependency = static_cast<ThreadHiveDependencyHandle>(&htasks[i]);
|
|
|
|
|
- }
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
void ThreadHive::threadRun(U threadId)
|
|
void ThreadHive::threadRun(U threadId)
|
|
@@ -223,7 +160,16 @@ void ThreadHive::threadRun(U threadId)
|
|
|
ANKI_ASSERT(task && task->m_cb);
|
|
ANKI_ASSERT(task && task->m_cb);
|
|
|
ANKI_HIVE_DEBUG_PRINT(
|
|
ANKI_HIVE_DEBUG_PRINT(
|
|
|
"tid: %lu will exec %p (udata: %p)\n", threadId, static_cast<void*>(task), static_cast<void*>(task->m_arg));
|
|
"tid: %lu will exec %p (udata: %p)\n", threadId, static_cast<void*>(task), static_cast<void*>(task->m_arg));
|
|
|
- task->m_cb(task->m_arg, threadId, *this);
|
|
|
|
|
|
|
+ task->m_cb(task->m_arg, threadId, *this, task->m_signalSemaphore);
|
|
|
|
|
+
|
|
|
|
|
+ // Signal the semaphore as early as possible
|
|
|
|
|
+ if(task->m_signalSemaphore)
|
|
|
|
|
+ {
|
|
|
|
|
+ const U32 out = task->m_signalSemaphore->m_atomic.fetchSub(1);
|
|
|
|
|
+ (void)out;
|
|
|
|
|
+ ANKI_ASSERT(out > 0u);
|
|
|
|
|
+ ANKI_HIVE_DEBUG_PRINT("\tsem is %u\n", out - 1u);
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
ANKI_HIVE_DEBUG_PRINT("tid: %lu thread quits!\n", threadId);
|
|
ANKI_HIVE_DEBUG_PRINT("tid: %lu thread quits!\n", threadId);
|
|
@@ -238,12 +184,14 @@ Bool ThreadHive::waitForWork(U threadId, Task*& task)
|
|
|
// Complete the previous task
|
|
// Complete the previous task
|
|
|
if(task)
|
|
if(task)
|
|
|
{
|
|
{
|
|
|
|
|
+#if ANKI_EXTRA_CHECKS
|
|
|
task->m_cb = nullptr;
|
|
task->m_cb = nullptr;
|
|
|
|
|
+#endif
|
|
|
--m_pendingTasks;
|
|
--m_pendingTasks;
|
|
|
|
|
|
|
|
- if(task->m_othersDepend || m_pendingTasks == 0)
|
|
|
|
|
|
|
+ if(task->m_signalSemaphore || m_pendingTasks == 0)
|
|
|
{
|
|
{
|
|
|
- // A dependency got resolved or we are out of tasks. Wake them all
|
|
|
|
|
|
|
+ // A dependency maybe got resolved or we are out of tasks. Wake them all
|
|
|
ANKI_HIVE_DEBUG_PRINT("tid: %lu wake all\n", threadId);
|
|
ANKI_HIVE_DEBUG_PRINT("tid: %lu wake all\n", threadId);
|
|
|
m_cvar.notifyAll();
|
|
m_cvar.notifyAll();
|
|
|
}
|
|
}
|
|
@@ -266,20 +214,8 @@ ThreadHive::Task* ThreadHive::getNewTask()
|
|
|
Task* task = m_head;
|
|
Task* task = m_head;
|
|
|
while(task)
|
|
while(task)
|
|
|
{
|
|
{
|
|
|
- ANKI_ASSERT(!task->done());
|
|
|
|
|
-
|
|
|
|
|
// Check if there are dependencies
|
|
// Check if there are dependencies
|
|
|
- Bool allDepsCompleted = true;
|
|
|
|
|
- for(U j = 0; j < task->m_depCount; ++j)
|
|
|
|
|
- {
|
|
|
|
|
- Task* depTask = task->m_deps[j];
|
|
|
|
|
-
|
|
|
|
|
- if(!depTask->done())
|
|
|
|
|
- {
|
|
|
|
|
- allDepsCompleted = false;
|
|
|
|
|
- break;
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ const Bool allDepsCompleted = task->m_waitSemaphore == nullptr || task->m_waitSemaphore->m_atomic.load() == 0;
|
|
|
|
|
|
|
|
if(allDepsCompleted)
|
|
if(allDepsCompleted)
|
|
|
{
|
|
{
|
|
@@ -324,8 +260,6 @@ void ThreadHive::waitAllTasks()
|
|
|
|
|
|
|
|
m_head = nullptr;
|
|
m_head = nullptr;
|
|
|
m_tail = nullptr;
|
|
m_tail = nullptr;
|
|
|
- m_allocatedTasks = 0;
|
|
|
|
|
- m_allocatedDeps = 0;
|
|
|
|
|
m_alloc.getMemoryPool().reset();
|
|
m_alloc.getMemoryPool().reset();
|
|
|
|
|
|
|
|
ANKI_HIVE_DEBUG_PRINT("mt: done waiting all\n");
|
|
ANKI_HIVE_DEBUG_PRINT("mt: done waiting all\n");
|