ThreadHive.cpp 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. // Copyright (C) 2009-2017, Panagiotis Christopoulos Charitos and contributors.
  2. // All rights reserved.
  3. // Code licensed under the BSD License.
  4. // http://www.anki3d.org/LICENSE
  5. #include <anki/util/ThreadHive.h>
  6. #include <cstring>
  7. #include <cstdio>
  8. namespace anki
  9. {
  10. #define ANKI_ENABLE_HIVE_DEBUG_PRINT 0
  11. #if ANKI_ENABLE_HIVE_DEBUG_PRINT
  12. #define ANKI_HIVE_DEBUG_PRINT(...) printf(__VA_ARGS__)
  13. #else
  14. #define ANKI_HIVE_DEBUG_PRINT(...) ((void)0)
  15. #endif
  16. class ThreadHive::Thread
  17. {
  18. public:
  19. U32 m_id; ///< An ID
  20. anki::Thread m_thread; ///< Runs the workingFunc
  21. ThreadHive* m_hive;
  22. /// Constructor
  23. Thread(U32 id, ThreadHive* hive)
  24. : m_id(id)
  25. , m_thread("anki_threadhive")
  26. , m_hive(hive)
  27. {
  28. ANKI_ASSERT(hive);
  29. m_thread.start(this, threadCallback);
  30. }
  31. private:
  32. /// Thread callaback
  33. static Error threadCallback(anki::ThreadCallbackInfo& info)
  34. {
  35. Thread& self = *static_cast<Thread*>(info.m_userData);
  36. self.m_hive->threadRun(self.m_id);
  37. return Error::NONE;
  38. }
  39. };
  40. class ThreadHive::Task
  41. {
  42. public:
  43. Task* m_next; ///< Next in the list.
  44. ThreadHiveTaskCallback m_cb; ///< Callback that defines the task.
  45. void* m_arg; ///< Args for the callback.
  46. Task** m_deps;
  47. U16 m_depCount;
  48. Bool8 m_othersDepend; ///< Other tasks depend on this one.
  49. Task()
  50. {
  51. }
  52. Task(const Task& b) = delete;
  53. Task& operator=(const Task& b) = delete;
  54. Bool done() const
  55. {
  56. return m_cb == nullptr;
  57. }
  58. };
  59. ThreadHive::ThreadHive(U threadCount, GenericMemoryPoolAllocator<U8> alloc)
  60. : m_slowAlloc(alloc)
  61. , m_alloc(alloc.getMemoryPool().getAllocationCallback(),
  62. alloc.getMemoryPool().getAllocationCallbackUserData(),
  63. 1024 * 4)
  64. , m_threadCount(threadCount)
  65. {
  66. m_threads = reinterpret_cast<Thread*>(m_slowAlloc.allocate(sizeof(Thread) * threadCount));
  67. for(U i = 0; i < threadCount; ++i)
  68. {
  69. ::new(&m_threads[i]) Thread(i, this);
  70. }
  71. }
  72. ThreadHive::~ThreadHive()
  73. {
  74. if(m_threads)
  75. {
  76. {
  77. LockGuard<Mutex> lock(m_mtx);
  78. m_quit = true;
  79. // Wake the threads
  80. m_cvar.notifyAll();
  81. }
  82. // Join and destroy
  83. U threadCount = m_threadCount;
  84. while(threadCount-- != 0)
  85. {
  86. Error err = m_threads[threadCount].m_thread.join();
  87. (void)err;
  88. m_threads[threadCount].~Thread();
  89. }
  90. m_slowAlloc.deallocate(static_cast<void*>(m_threads), m_threadCount * sizeof(Thread));
  91. }
  92. }
  93. void ThreadHive::submitTasks(ThreadHiveTask* tasks, const U taskCount)
  94. {
  95. ANKI_ASSERT(tasks && taskCount > 0);
  96. // Allocate tasks
  97. Task* const htasks = m_alloc.newArray<Task>(taskCount);
  98. // Allocate the dependency handles
  99. U depCount = 0;
  100. for(U i = 0; i < taskCount; ++i)
  101. {
  102. depCount += tasks[i].m_inDependencies.getSize();
  103. }
  104. Task** depHandles;
  105. if(depCount)
  106. {
  107. depHandles = m_alloc.newArray<Task*>(depCount);
  108. }
  109. else
  110. {
  111. depHandles = nullptr;
  112. }
  113. depCount = 0;
  114. // Initialize tasks
  115. for(U i = 0; i < taskCount; ++i)
  116. {
  117. const ThreadHiveTask& inTask = tasks[i];
  118. Task& outTask = htasks[i];
  119. outTask.m_cb = inTask.m_callback;
  120. outTask.m_arg = inTask.m_argument;
  121. outTask.m_depCount = 0;
  122. outTask.m_next = nullptr;
  123. outTask.m_othersDepend = false;
  124. // Set the dependencies
  125. if(inTask.m_inDependencies.getSize() > 0)
  126. {
  127. outTask.m_deps = &depHandles[depCount];
  128. depCount += inTask.m_inDependencies.getSize();
  129. }
  130. else
  131. {
  132. outTask.m_deps = nullptr;
  133. }
  134. }
  135. // Push work
  136. {
  137. LockGuard<Mutex> lock(m_mtx);
  138. for(U i = 0; i < taskCount; ++i)
  139. {
  140. const ThreadHiveTask& inTask = tasks[i];
  141. Task& outTask = htasks[i];
  142. for(U j = 0; j < inTask.m_inDependencies.getSize(); ++j)
  143. {
  144. ThreadHiveDependencyHandle dep = inTask.m_inDependencies[j];
  145. ANKI_ASSERT(dep);
  146. Task* depTask = static_cast<Task*>(dep);
  147. if(!depTask->done())
  148. {
  149. outTask.m_deps[outTask.m_depCount++] = depTask;
  150. depTask->m_othersDepend = true;
  151. }
  152. }
  153. // Push to the list
  154. ANKI_HIVE_DEBUG_PRINT(
  155. "pushing back %p (udata %p)\n", static_cast<void*>(&outTask), static_cast<void*>(outTask.m_arg));
  156. if(m_head != nullptr)
  157. {
  158. ANKI_ASSERT(m_tail && m_head);
  159. m_tail->m_next = &outTask;
  160. m_tail = &outTask;
  161. }
  162. else
  163. {
  164. ANKI_ASSERT(m_tail == nullptr);
  165. m_head = &outTask;
  166. m_tail = m_head;
  167. }
  168. }
  169. m_pendingTasks += taskCount;
  170. ANKI_HIVE_DEBUG_PRINT("submit tasks\n");
  171. // Notify all threads
  172. m_cvar.notifyAll();
  173. }
  174. // Set the out dependencies
  175. for(U i = 0; i < taskCount; ++i)
  176. {
  177. tasks[i].m_outDependency = static_cast<ThreadHiveDependencyHandle>(&htasks[i]);
  178. }
  179. }
  180. void ThreadHive::threadRun(U threadId)
  181. {
  182. Task* task = nullptr;
  183. while(!waitForWork(threadId, task))
  184. {
  185. // Run the task
  186. ANKI_ASSERT(task && task->m_cb);
  187. ANKI_HIVE_DEBUG_PRINT(
  188. "tid: %lu will exec %p (udata: %p)\n", threadId, static_cast<void*>(task), static_cast<void*>(task->m_arg));
  189. task->m_cb(task->m_arg, threadId, *this);
  190. }
  191. ANKI_HIVE_DEBUG_PRINT("tid: %lu thread quits!\n", threadId);
  192. }
  193. Bool ThreadHive::waitForWork(U threadId, Task*& task)
  194. {
  195. LockGuard<Mutex> lock(m_mtx);
  196. ANKI_HIVE_DEBUG_PRINT("tid: %lu locking\n", threadId);
  197. // Complete the previous task
  198. if(task)
  199. {
  200. task->m_cb = nullptr;
  201. --m_pendingTasks;
  202. if(task->m_othersDepend || m_pendingTasks == 0)
  203. {
  204. // A dependency got resolved or we are out of tasks. Wake them all
  205. ANKI_HIVE_DEBUG_PRINT("tid: %lu wake all\n", threadId);
  206. m_cvar.notifyAll();
  207. }
  208. }
  209. while(!m_quit && (task = getNewTask()) == nullptr)
  210. {
  211. ANKI_HIVE_DEBUG_PRINT("tid: %lu waiting\n", threadId);
  212. // Wait if there is no work.
  213. m_cvar.wait(m_mtx);
  214. }
  215. return m_quit;
  216. }
  217. ThreadHive::Task* ThreadHive::getNewTask()
  218. {
  219. Task* prevTask = nullptr;
  220. Task* task = m_head;
  221. while(task)
  222. {
  223. ANKI_ASSERT(!task->done());
  224. // Check if there are dependencies
  225. Bool allDepsCompleted = true;
  226. for(U j = 0; j < task->m_depCount; ++j)
  227. {
  228. Task* depTask = task->m_deps[j];
  229. if(!depTask->done())
  230. {
  231. allDepsCompleted = false;
  232. break;
  233. }
  234. }
  235. if(allDepsCompleted)
  236. {
  237. // Found something, pop it
  238. if(prevTask)
  239. {
  240. prevTask->m_next = task->m_next;
  241. }
  242. if(m_head == task)
  243. {
  244. m_head = task->m_next;
  245. }
  246. if(m_tail == task)
  247. {
  248. m_tail = prevTask;
  249. }
  250. #if ANKI_EXTRA_CHECKS
  251. task->m_next = nullptr;
  252. #endif
  253. break;
  254. }
  255. prevTask = task;
  256. task = task->m_next;
  257. }
  258. return task;
  259. }
  260. void ThreadHive::waitAllTasks()
  261. {
  262. ANKI_HIVE_DEBUG_PRINT("mt: waiting all\n");
  263. LockGuard<Mutex> lock(m_mtx);
  264. while(m_pendingTasks > 0)
  265. {
  266. m_cvar.wait(m_mtx);
  267. }
  268. m_head = nullptr;
  269. m_tail = nullptr;
  270. m_allocatedTasks = 0;
  271. m_allocatedDeps = 0;
  272. m_alloc.getMemoryPool().reset();
  273. ANKI_HIVE_DEBUG_PRINT("mt: done waiting all\n");
  274. }
  275. } // end namespace anki