ThreadPool.h 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. #ifndef ANKI_CORE_PARALLEL_MANAGER_H
  2. #define ANKI_CORE_PARALLEL_MANAGER_H
  3. #include "anki/util/Barrier.h"
  4. #include "anki/util/Singleton.h"
  5. #include "anki/util/Vector.h"
  6. #include "anki/util/StdTypes.h"
  7. #include <thread>
  8. namespace anki {
  9. // Forward
  10. class ThreadPool;
  11. /// A job assignment for a ThreadWorker
  12. struct ThreadJob
  13. {
  14. virtual ~ThreadJob()
  15. {}
  16. virtual void operator()(U threadId, U threadsCount) = 0;
  17. /// Chose a starting and end index
  18. void choseStartEnd(U threadId, U threadsCount, U64 elementsCount,
  19. U64& start, U64& end)
  20. {
  21. start = threadId * (elementsCount / threadsCount);
  22. end = (threadId == threadsCount - 1)
  23. ? elementsCount
  24. : start + elementsCount / threadsCount;
  25. }
  26. };
  27. /// A dummy job
  28. struct ThreadJobDummy: ThreadJob
  29. {
  30. void operator()(U threadId, U threadsCount)
  31. {
  32. (void)threadId;
  33. (void)threadsCount;
  34. }
  35. };
  36. /// The thread that executes a ThreadJobCallback
  37. class ThreadWorker
  38. {
  39. public:
  40. /// Constructor
  41. ThreadWorker(U32 id, Barrier* barrier, ThreadPool* threadPool);
  42. /// @name Accessors
  43. /// @{
  44. U32 getId() const
  45. {
  46. return id;
  47. }
  48. /// @}
  49. /// Assign new job to the thread
  50. void assignNewJob(ThreadJob* job);
  51. private:
  52. U32 id = 0; ///< An ID
  53. std::thread thread; ///< Runs the workingFunc
  54. std::mutex mutex; ///< Protect the ThreadWorker::job
  55. std::condition_variable condVar; ///< To wake up the thread
  56. Barrier* barrier = nullptr; ///< For synchronization
  57. ThreadJob* job = nullptr; ///< Its NULL if there are no pending job
  58. ThreadPool* threadPool;
  59. /// Start thread
  60. void start()
  61. {
  62. thread = std::thread(&ThreadWorker::workingFunc, this);
  63. }
  64. /// Thread loop
  65. void workingFunc();
  66. };
  67. /// Parallel job dispatcher.You feed it with jobs and sends them for
  68. /// execution in parallel and then waits for all to finish
  69. class ThreadPool
  70. {
  71. public:
  72. static constexpr U MAX_THREADS = 32; ///< An absolute limit
  73. /// Default constructor
  74. ThreadPool()
  75. {}
  76. /// Constructor #2
  77. ThreadPool(U threadsNum)
  78. {
  79. init(threadsNum);
  80. }
  81. /// Init the manager
  82. void init(U threadsNum);
  83. /// Assign a job to a working thread
  84. void assignNewJob(U jobId, ThreadJob* job)
  85. {
  86. jobs[jobId]->assignNewJob(job);
  87. }
  88. /// Wait for all jobs to finish
  89. void waitForAllJobsToFinish()
  90. {
  91. barrier->wait();
  92. }
  93. U32 getThreadsCount() const
  94. {
  95. return jobs.size();
  96. }
  97. private:
  98. PtrVector<ThreadWorker> jobs; ///< Worker threads
  99. std::unique_ptr<Barrier> barrier; ///< Synchronization barrier
  100. };
  101. /// Singleton
  102. typedef Singleton<ThreadPool> ThreadPoolSingleton;
  103. } // end namespace anki
  104. #endif