Thread.h 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. // Copyright (C) 2009-2016, Panagiotis Christopoulos Charitos.
  2. // All rights reserved.
  3. // Code licensed under the BSD License.
  4. // http://www.anki3d.org/LICENSE
  5. #pragma once
  6. #include <anki/util/StdTypes.h>
  7. #include <anki/util/Array.h>
  8. #include <anki/util/NonCopyable.h>
  9. #include <atomic>
  10. #define ANKI_DISABLE_THREADPOOL_THREADING 0
  11. namespace anki
  12. {
  13. /// @addtogroup util_thread
  14. /// @{
  15. /// Thread implementation
  16. class Thread : public NonCopyable
  17. {
  18. public:
  19. using Id = U64;
  20. /// It holds some information to be passed to the thread's callback
  21. class Info
  22. {
  23. public:
  24. void* m_userData;
  25. const char* m_threadName;
  26. };
  27. /// The type of the tread callback
  28. using Callback = Error (*)(Info&);
  29. /// Create a thread with or without a name
  30. /// @param[in] name The name of the new thread. Can be nullptr
  31. Thread(const char* name);
  32. ~Thread();
  33. /// Start the thread
  34. /// @param userData The user data of the thread callback
  35. /// @param callback The thread callback that will be executed
  36. void start(void* userData, Callback callback);
  37. /// Wait for the thread to finish
  38. /// @return The error code of the thread's callback
  39. ANKI_USE_RESULT Error join();
  40. /// Identify the current thread
  41. static Id getCurrentThreadId();
  42. anki_internal:
  43. const char* getName() const
  44. {
  45. return &m_name[0];
  46. }
  47. void* getUserData() const
  48. {
  49. return m_userData;
  50. }
  51. Callback getCallback() const
  52. {
  53. return m_callback;
  54. }
  55. private:
  56. void* m_impl = nullptr; ///< The system native type
  57. Array<char, 32> m_name; ///< The name of the thread
  58. Callback m_callback = nullptr; ///< The callback
  59. void* m_userData = nullptr; ///< The user date to pass to the callback
  60. #if ANKI_ASSERTIONS
  61. Bool8 m_started = false;
  62. #endif
  63. };
  64. /// Mutex
  65. class Mutex : public NonCopyable
  66. {
  67. friend class ConditionVariable;
  68. public:
  69. Mutex();
  70. ~Mutex();
  71. /// Lock
  72. void lock();
  73. /// Try lock
  74. /// @return True if it was locked successfully
  75. Bool tryLock();
  76. /// Unlock
  77. void unlock();
  78. private:
  79. void* m_impl = nullptr; ///< The system native type
  80. };
  81. /// Condition variable
  82. class ConditionVariable : public NonCopyable
  83. {
  84. public:
  85. ConditionVariable();
  86. ~ConditionVariable();
  87. /// Signal one thread
  88. void notifyOne();
  89. /// Signal all threads
  90. void notifyAll();
  91. /// Bock until signaled.
  92. /// @param mtx The mutex.
  93. void wait(Mutex& mtx);
  94. private:
  95. void* m_impl = nullptr; ///< The system native type
  96. };
  97. /// Spin lock. Good if the critical section will be executed in a short period
  98. /// of time
  99. class SpinLock : public NonCopyable
  100. {
  101. public:
  102. /// Lock
  103. void lock()
  104. {
  105. while(m_lock.test_and_set(std::memory_order_acquire))
  106. {
  107. }
  108. }
  109. /// Unlock
  110. void unlock()
  111. {
  112. m_lock.clear(std::memory_order_release);
  113. }
  114. private:
  115. std::atomic_flag m_lock = ATOMIC_FLAG_INIT;
  116. };
  117. /// Lock guard. When constructed it locks a TMutex and unlocks it when it gets
  118. /// destroyed.
  119. template<typename TMutex>
  120. class LockGuard
  121. {
  122. public:
  123. LockGuard(TMutex& mtx)
  124. : m_mtx(&mtx)
  125. {
  126. m_mtx->lock();
  127. }
  128. ~LockGuard()
  129. {
  130. m_mtx->unlock();
  131. }
  132. private:
  133. TMutex* m_mtx;
  134. };
  135. /// A barrier for thread synchronization. It works almost like boost::barrier
  136. class Barrier : public NonCopyable
  137. {
  138. public:
  139. Barrier(U32 count);
  140. ~Barrier();
  141. /// Wait until all threads call wait().
  142. Bool wait();
  143. private:
  144. void* m_impl = nullptr;
  145. };
  146. // Forward
  147. namespace detail
  148. {
  149. class ThreadPoolThread;
  150. }
  151. /// Parallel task dispatcher. You feed it with tasks and sends them for
  152. /// execution in parallel and then waits for all to finish
  153. class ThreadPool : public NonCopyable
  154. {
  155. friend class detail::ThreadPoolThread;
  156. public:
  157. static constexpr U MAX_THREADS = 32; ///< An absolute limit
  158. /// A task assignment for a ThreadPool
  159. class Task
  160. {
  161. public:
  162. virtual ~Task()
  163. {
  164. }
  165. virtual Error operator()(U32 taskId, PtrSize threadsCount) = 0;
  166. /// Chose a starting and end index
  167. static void choseStartEnd(U32 taskId,
  168. PtrSize threadsCount,
  169. PtrSize elementsCount,
  170. PtrSize& start,
  171. PtrSize& end)
  172. {
  173. F32 tid = taskId;
  174. F32 div = F32(elementsCount) / threadsCount;
  175. start = PtrSize(tid * div);
  176. end = PtrSize((tid + 1.0) * div);
  177. }
  178. };
  179. /// Constructor
  180. ThreadPool(U32 threadsCount);
  181. ~ThreadPool();
  182. /// Assign a task to a working thread
  183. /// @param slot The slot of the task
  184. /// @param task The task. If it's nullptr then a dummy task will be assigned
  185. void assignNewTask(U32 slot, Task* task);
  186. /// Wait for all tasks to finish.
  187. /// @return The error code in one of the worker threads.
  188. ANKI_USE_RESULT Error waitForAllThreadsToFinish()
  189. {
  190. #if !ANKI_DISABLE_THREADPOOL_THREADING
  191. m_barrier.wait();
  192. m_tasksAssigned = 0;
  193. #endif
  194. Error err = m_err;
  195. m_err = ErrorCode::NONE;
  196. return err;
  197. }
  198. PtrSize getThreadsCount() const
  199. {
  200. return m_threadsCount;
  201. }
  202. private:
  203. /// A dummy task for a ThreadPool
  204. class DummyTask : public Task
  205. {
  206. public:
  207. Error operator()(U32 taskId, PtrSize threadsCount)
  208. {
  209. (void)taskId;
  210. (void)threadsCount;
  211. return ErrorCode::NONE;
  212. }
  213. };
  214. #if !ANKI_DISABLE_THREADPOOL_THREADING
  215. Barrier m_barrier; ///< Synchronization barrier
  216. detail::ThreadPoolThread* m_threads = nullptr; ///< Threads array
  217. U m_tasksAssigned = 0;
  218. #endif
  219. U8 m_threadsCount = 0;
  220. Error m_err = ErrorCode::NONE;
  221. static DummyTask m_dummyTask;
  222. };
  223. /// @}
  224. } // end namespace anki