taskschedulerinternal.h 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. // ======================================================================== //
  2. // Copyright 2009-2017 Intel Corporation //
  3. // //
  4. // Licensed under the Apache License, Version 2.0 (the "License"); //
  5. // you may not use this file except in compliance with the License. //
  6. // You may obtain a copy of the License at //
  7. // //
  8. // http://www.apache.org/licenses/LICENSE-2.0 //
  9. // //
  10. // Unless required by applicable law or agreed to in writing, software //
  11. // distributed under the License is distributed on an "AS IS" BASIS, //
  12. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. //
  13. // See the License for the specific language governing permissions and //
  14. // limitations under the License. //
  15. // ======================================================================== //
  16. #pragma once
  17. #include "../sys/platform.h"
  18. #include "../sys/alloc.h"
  19. #include "../sys/barrier.h"
  20. #include "../sys/thread.h"
  21. #include "../sys/mutex.h"
  22. #include "../sys/condition.h"
  23. #include "../sys/ref.h"
  24. #include "../sys/atomic.h"
  25. #include "../math/range.h"
  26. #include <list>
  27. namespace embree
  28. {
  29. struct TaskScheduler : public RefCount
  30. {
  31. ALIGNED_STRUCT;
  32. friend class Device;
  33. static const size_t TASK_STACK_SIZE = 2*1024; //!< task structure stack
  34. static const size_t CLOSURE_STACK_SIZE = 256*1024; //!< stack for task closures
  35. struct Thread;
  36. /*! virtual interface for all tasks */
  37. struct TaskFunction {
  38. virtual void execute() = 0;
  39. };
  40. /*! builds a task interface from a closure */
  41. template<typename Closure>
  42. struct ClosureTaskFunction : public TaskFunction
  43. {
  44. Closure closure;
  45. __forceinline ClosureTaskFunction (const Closure& closure) : closure(closure) {}
  46. void execute() { closure(); };
  47. };
  48. struct __aligned(64) Task
  49. {
  50. /*! states a task can be in */
  51. enum { DONE, INITIALIZED };
  52. /*! switch from one state to another */
  53. __forceinline void switch_state(int from, int to)
  54. {
  55. __memory_barrier();
  56. bool success = state.compare_exchange_strong(from,to);
  57. assert(success);
  58. }
  59. /*! try to switch from one state to another */
  60. __forceinline bool try_switch_state(int from, int to) {
  61. __memory_barrier();
  62. return state.compare_exchange_strong(from,to);
  63. }
  64. /*! increment/decrement dependency counter */
  65. void add_dependencies(int n) {
  66. dependencies+=n;
  67. }
  68. /*! initialize all tasks to DONE state by default */
  69. __forceinline Task()
  70. : state(DONE) {}
  71. /*! construction of new task */
  72. __forceinline Task (TaskFunction* closure, Task* parent, size_t stackPtr, size_t N)
  73. : dependencies(1), stealable(true), closure(closure), parent(parent), stackPtr(stackPtr), N(N)
  74. {
  75. if (parent) parent->add_dependencies(+1);
  76. switch_state(DONE,INITIALIZED);
  77. }
  78. /*! construction of stolen task, stealing thread will decrement initial dependency */
  79. __forceinline Task (TaskFunction* closure, Task* parent)
  80. : dependencies(1), stealable(false), closure(closure), parent(parent), stackPtr(-1), N(1)
  81. {
  82. switch_state(DONE,INITIALIZED);
  83. }
  84. /*! try to steal this task */
  85. bool try_steal(Task& child)
  86. {
  87. if (!stealable) return false;
  88. if (!try_switch_state(INITIALIZED,DONE)) return false;
  89. new (&child) Task(closure, this);
  90. return true;
  91. }
  92. /*! run this task */
  93. __dllexport void run(Thread& thread);
  94. public:
  95. std::atomic<int> state; //!< state this task is in
  96. std::atomic<int> dependencies; //!< dependencies to wait for
  97. std::atomic<bool> stealable; //!< true if task can be stolen
  98. TaskFunction* closure; //!< the closure to execute
  99. Task* parent; //!< parent task to signal when we are finished
  100. size_t stackPtr; //!< stack location where closure is stored
  101. size_t N; //!< approximative size of task
  102. };
  103. struct TaskQueue
  104. {
  105. TaskQueue ()
  106. : left(0), right(0), stackPtr(0) {}
  107. __forceinline void* alloc(size_t bytes, size_t align = 64) {
  108. stackPtr += bytes + ((align - stackPtr) & (align-1));
  109. assert(stackPtr <= CLOSURE_STACK_SIZE);
  110. return &stack[stackPtr-bytes];
  111. }
  112. template<typename Closure>
  113. __forceinline void push_right(Thread& thread, const size_t size, const Closure& closure)
  114. {
  115. assert(right < TASK_STACK_SIZE);
  116. /* allocate new task on right side of stack */
  117. size_t oldStackPtr = stackPtr;
  118. TaskFunction* func = new (alloc(sizeof(ClosureTaskFunction<Closure>))) ClosureTaskFunction<Closure>(closure);
  119. new (&tasks[right++]) Task(func,thread.task,oldStackPtr,size);
  120. /* also move left pointer */
  121. if (left >= right-1) left = right-1;
  122. }
  123. __dllexport bool execute_local(Thread& thread, Task* parent);
  124. bool steal(Thread& thread);
  125. size_t getTaskSizeAtLeft();
  126. bool empty() { return right == 0; }
  127. public:
  128. /* task stack */
  129. Task tasks[TASK_STACK_SIZE];
  130. __aligned(64) std::atomic<size_t> left; //!< threads steal from left
  131. __aligned(64) std::atomic<size_t> right; //!< new tasks are added to the right
  132. /* closure stack */
  133. __aligned(64) char stack[CLOSURE_STACK_SIZE];
  134. size_t stackPtr;
  135. };
  136. /*! thread local structure for each thread */
  137. struct Thread
  138. {
  139. ALIGNED_STRUCT;
  140. Thread (size_t threadIndex, const Ref<TaskScheduler>& scheduler)
  141. : threadIndex(threadIndex), task(nullptr), scheduler(scheduler) {}
  142. __forceinline size_t threadCount() {
  143. return scheduler->threadCounter;
  144. }
  145. size_t threadIndex; //!< ID of this thread
  146. TaskQueue tasks; //!< local task queue
  147. Task* task; //!< current active task
  148. Ref<TaskScheduler> scheduler; //!< pointer to task scheduler
  149. };
  150. /*! pool of worker threads */
  151. struct ThreadPool
  152. {
  153. ThreadPool (bool set_affinity);
  154. ~ThreadPool ();
  155. /*! starts the threads */
  156. __dllexport void startThreads();
  157. /*! sets number of threads to use */
  158. void setNumThreads(size_t numThreads, bool startThreads = false);
  159. /*! adds a task scheduler object for scheduling */
  160. __dllexport void add(const Ref<TaskScheduler>& scheduler);
  161. /*! remove the task scheduler object again */
  162. __dllexport void remove(const Ref<TaskScheduler>& scheduler);
  163. /*! returns number of threads of the thread pool */
  164. size_t size() const { return numThreads; }
  165. /*! main loop for all threads */
  166. void thread_loop(size_t threadIndex);
  167. private:
  168. std::atomic<size_t> numThreads;
  169. std::atomic<size_t> numThreadsRunning;
  170. bool set_affinity;
  171. std::atomic<bool> running;
  172. std::vector<thread_t> threads;
  173. private:
  174. MutexSys mutex;
  175. ConditionSys condition;
  176. std::list<Ref<TaskScheduler> > schedulers;
  177. };
  178. TaskScheduler ();
  179. ~TaskScheduler ();
  180. /*! initializes the task scheduler */
  181. static void create(size_t numThreads, bool set_affinity, bool start_threads);
  182. /*! destroys the task scheduler again */
  183. static void destroy();
  184. /*! lets new worker threads join the tasking system */
  185. void join();
  186. void reset();
  187. /*! let a worker thread allocate a thread index */
  188. __dllexport ssize_t allocThreadIndex();
  189. /*! wait for some number of threads available (threadCount includes main thread) */
  190. void wait_for_threads(size_t threadCount);
  191. /*! thread loop for all worker threads */
  192. std::exception_ptr thread_loop(size_t threadIndex);
  193. /*! steals a task from a different thread */
  194. bool steal_from_other_threads(Thread& thread);
  195. template<typename Predicate, typename Body>
  196. static void steal_loop(Thread& thread, const Predicate& pred, const Body& body);
  197. /* spawn a new task at the top of the threads task stack */
  198. template<typename Closure>
  199. void spawn_root(const Closure& closure, size_t size = 1, bool useThreadPool = true)
  200. {
  201. if (useThreadPool) startThreads();
  202. size_t threadIndex = allocThreadIndex();
  203. std::unique_ptr<Thread> mthread(new Thread(threadIndex,this)); // too large for stack allocation
  204. Thread& thread = *mthread;
  205. assert(threadLocal[threadIndex].load() == nullptr);
  206. threadLocal[threadIndex] = &thread;
  207. Thread* oldThread = swapThread(&thread);
  208. thread.tasks.push_right(thread,size,closure);
  209. {
  210. Lock<MutexSys> lock(mutex);
  211. anyTasksRunning++;
  212. hasRootTask = true;
  213. condition.notify_all();
  214. }
  215. if (useThreadPool) addScheduler(this);
  216. while (thread.tasks.execute_local(thread,nullptr));
  217. anyTasksRunning--;
  218. if (useThreadPool) removeScheduler(this);
  219. threadLocal[threadIndex] = nullptr;
  220. swapThread(oldThread);
  221. /* remember exception to throw */
  222. std::exception_ptr except = nullptr;
  223. if (cancellingException != nullptr) except = cancellingException;
  224. /* wait for all threads to terminate */
  225. threadCounter--;
  226. while (threadCounter > 0) yield();
  227. cancellingException = nullptr;
  228. /* re-throw proper exception */
  229. if (except != nullptr)
  230. std::rethrow_exception(except);
  231. }
  232. /* spawn a new task at the top of the threads task stack */
  233. template<typename Closure>
  234. static __forceinline void spawn(size_t size, const Closure& closure)
  235. {
  236. Thread* thread = TaskScheduler::thread();
  237. if (likely(thread != nullptr)) thread->tasks.push_right(*thread,size,closure);
  238. else instance()->spawn_root(closure,size);
  239. }
  240. /* spawn a new task at the top of the threads task stack */
  241. template<typename Closure>
  242. static __forceinline void spawn(const Closure& closure) {
  243. spawn(1,closure);
  244. }
  245. /* spawn a new task set */
  246. template<typename Index, typename Closure>
  247. static void spawn(const Index begin, const Index end, const Index blockSize, const Closure& closure)
  248. {
  249. spawn(end-begin, [=,&closure]()
  250. {
  251. if (end-begin <= blockSize) {
  252. return closure(range<Index>(begin,end));
  253. }
  254. const Index center = (begin+end)/2;
  255. spawn(begin,center,blockSize,closure);
  256. spawn(center,end ,blockSize,closure);
  257. wait();
  258. });
  259. }
  260. /* work on spawned subtasks and wait until all have finished */
  261. __dllexport static bool wait();
  262. /* returns the index of the current thread */
  263. __dllexport static size_t threadIndex();
  264. /* returns the total number of threads */
  265. __dllexport static size_t threadCount();
  266. private:
  267. /* returns the thread local task list of this worker thread */
  268. __dllexport static Thread* thread();
  269. /* sets the thread local task list of this worker thread */
  270. __dllexport static Thread* swapThread(Thread* thread);
  271. /*! returns the taskscheduler object to be used by the master thread */
  272. __dllexport static TaskScheduler* instance();
  273. /*! starts the threads */
  274. __dllexport static void startThreads();
  275. /*! adds a task scheduler object for scheduling */
  276. __dllexport static void addScheduler(const Ref<TaskScheduler>& scheduler);
  277. /*! remove the task scheduler object again */
  278. __dllexport static void removeScheduler(const Ref<TaskScheduler>& scheduler);
  279. private:
  280. std::vector<atomic<Thread*>> threadLocal;
  281. std::atomic<size_t> threadCounter;
  282. std::atomic<size_t> anyTasksRunning;
  283. std::atomic<bool> hasRootTask;
  284. std::exception_ptr cancellingException;
  285. MutexSys mutex;
  286. ConditionSys condition;
  287. private:
  288. static size_t g_numThreads;
  289. static __thread TaskScheduler* g_instance;
  290. static __thread Thread* thread_local_thread;
  291. static ThreadPool* threadPool;
  292. };
  293. };