Thread.h 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. #ifndef ANKI_UTIL_THREAD_H
  2. #define ANKI_UTIL_THREAD_H
  3. #include "anki/Config.h"
  4. #include "anki/util/StdTypes.h"
  5. #include "anki/util/Array.h"
  6. #include "anki/util/Vector.h"
  7. #include <condition_variable>
  8. #include <mutex>
  9. #include <thread>
  10. namespace anki {
  11. #define ANKI_DISABLE_THREADPOOL_THREADING 0
  12. class Threadpool;
  13. typedef U32 ThreadId;
  14. /// A barrier for thread synchronization. It works just like boost::barrier
  15. class Barrier
  16. {
  17. public:
  18. Barrier(U32 count_)
  19. : threshold(count_), count(count_), generation(0)
  20. {
  21. ANKI_ASSERT(count_ != 0);
  22. }
  23. Bool wait()
  24. {
  25. std::unique_lock<std::mutex> lock(mtx);
  26. U32 gen = generation;
  27. if(--count == 0)
  28. {
  29. generation++;
  30. count = threshold;
  31. cond.notify_all();
  32. return true;
  33. }
  34. while(gen == generation)
  35. {
  36. cond.wait(lock);
  37. }
  38. return false;
  39. }
  40. private:
  41. std::mutex mtx;
  42. std::condition_variable cond;
  43. U32 threshold;
  44. U32 count;
  45. U32 generation;
  46. };
  47. /// A task assignment to threads
  48. struct ThreadTask
  49. {
  50. virtual ~ThreadTask()
  51. {}
  52. virtual void operator()(ThreadId threadId) = 0;
  53. };
  54. /// This is a thread with 2 sync points
  55. class DualSyncThread
  56. {
  57. public:
  58. DualSyncThread(ThreadId threadId_)
  59. : id(threadId_),
  60. barriers{{{2}, {2}}},
  61. task(nullptr)
  62. {}
  63. /// The thread does not own the task
  64. /// @note This operation is not thread safe. Call it between syncs
  65. /// @note This class will not own the task_
  66. void setTask(ThreadTask* task_)
  67. {
  68. task = task_;
  69. }
  70. /// Start the thread
  71. void start()
  72. {
  73. thread = std::thread(&DualSyncThread::workingFunc, this);
  74. }
  75. /// Sync with one of the 2 sync points
  76. void sync(U syncPoint)
  77. {
  78. barriers[syncPoint].wait();
  79. }
  80. private:
  81. ThreadId id;
  82. std::thread thread; ///< Runs the workingFunc
  83. Array<Barrier, 2> barriers;
  84. ThreadTask* task; ///< Its nullptr if there are no pending task
  85. /// Thread loop
  86. void workingFunc();
  87. };
  88. /// A task assignment for a ThreadpoolThread
  89. struct ThreadpoolTask
  90. {
  91. virtual ~ThreadpoolTask()
  92. {}
  93. virtual void operator()(ThreadId threadId, U threadsCount) = 0;
  94. /// Chose a starting and end index
  95. void choseStartEnd(ThreadId threadId, PtrSize threadsCount,
  96. PtrSize elementsCount, PtrSize& start, PtrSize& end)
  97. {
  98. start = threadId * (elementsCount / threadsCount);
  99. end = (threadId == threadsCount - 1)
  100. ? elementsCount
  101. : start + elementsCount / threadsCount;
  102. }
  103. };
  104. /// A dummy thread pool task
  105. struct DummyThreadpoolTask: ThreadpoolTask
  106. {
  107. void operator()(ThreadId threadId, U threadsCount)
  108. {
  109. (void)threadId;
  110. (void)threadsCount;
  111. }
  112. };
  113. /// The thread that executes a ThreadpoolTask
  114. class ThreadpoolThread
  115. {
  116. public:
  117. /// Constructor
  118. ThreadpoolThread(ThreadId id, Barrier* barrier, Threadpool* threadpool);
  119. /// Assign new task to the thread
  120. /// @note
  121. void assignNewTask(ThreadpoolTask* task);
  122. private:
  123. ThreadId id; ///< An ID
  124. std::thread thread; ///< Runs the workingFunc
  125. std::mutex mutex; ///< Protect the task
  126. std::condition_variable condVar; ///< To wake up the thread
  127. Barrier* barrier; ///< For synchronization
  128. ThreadpoolTask* task; ///< Its NULL if there are no pending task
  129. Threadpool* threadpool;
  130. /// Start thread
  131. void start()
  132. {
  133. thread = std::thread(&ThreadpoolThread::workingFunc, this);
  134. }
  135. /// Thread loop
  136. void workingFunc();
  137. };
  138. /// Parallel task dispatcher.You feed it with tasks and sends them for
  139. /// execution in parallel and then waits for all to finish
  140. class Threadpool
  141. {
  142. public:
  143. static constexpr U MAX_THREADS = 32; ///< An absolute limit
  144. /// Default constructor
  145. Threadpool()
  146. {}
  147. /// Constructor #2
  148. Threadpool(U threadsNum)
  149. {
  150. init(threadsNum);
  151. }
  152. ~Threadpool();
  153. /// Init the manager
  154. void init(U threadsNum);
  155. /// Assign a task to a working thread
  156. void assignNewTask(U taskId, ThreadpoolTask* task)
  157. {
  158. threads[taskId]->assignNewTask(task);
  159. }
  160. /// Wait for all tasks to finish
  161. void waitForAllThreadsToFinish()
  162. {
  163. #if !ANKI_DISABLE_THREADPOOL_THREADING
  164. barrier->wait();
  165. #endif
  166. }
  167. U32 getThreadsCount() const
  168. {
  169. return threads.size();
  170. }
  171. private:
  172. Vector<ThreadpoolThread*> threads; ///< Worker threads
  173. std::unique_ptr<Barrier> barrier; ///< Synchronization barrier
  174. };
  175. } // end namespace anki
  176. #endif