ThreadPool.h 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. #ifndef ANKI_CORE_PARALLEL_MANAGER_H
  2. #define ANKI_CORE_PARALLEL_MANAGER_H
  3. #include "anki/util/Barrier.h"
  4. #include "anki/util/Singleton.h"
  5. #include "anki/util/Vector.h"
  6. #include "anki/util/StdTypes.h"
  7. #include <thread>
  8. namespace anki {
  9. // Forward
  10. class ThreadPool;
  11. /// A job assignment for a ThreadWorker
  12. struct ThreadJob
  13. {
  14. virtual ~ThreadJob()
  15. {}
  16. virtual void operator()(U threadId, U threadsCount) = 0;
  17. /// Chose a starting and end index
  18. void choseStartEnd(U threadId, U threadsCount, U64 elementsCount,
  19. U64& start, U64& end)
  20. {
  21. start = threadId * (elementsCount / threadsCount);
  22. end = (threadId == threadsCount - 1)
  23. ? elementsCount
  24. : start + elementsCount / threadsCount;
  25. }
  26. };
  27. /// A dummy job
  28. struct ThreadJobDummy
  29. {
  30. void operator()()
  31. {}
  32. };
  33. /// The thread that executes a ThreadJobCallback
  34. class ThreadWorker
  35. {
  36. public:
  37. /// Constructor
  38. ThreadWorker(U32 id, Barrier* barrier, ThreadPool* threadPool);
  39. /// @name Accessors
  40. /// @{
  41. U32 getId() const
  42. {
  43. return id;
  44. }
  45. /// @}
  46. /// Assign new job to the thread
  47. void assignNewJob(ThreadJob* job);
  48. private:
  49. U32 id = 0; ///< An ID
  50. std::thread thread; ///< Runs the workingFunc
  51. std::mutex mutex; ///< Protect the ThreadWorker::job
  52. std::condition_variable condVar; ///< To wake up the thread
  53. Barrier* barrier = nullptr; ///< For synchronization
  54. ThreadJob* job = nullptr; ///< Its NULL if there are no pending job
  55. ThreadPool* threadPool;
  56. /// Start thread
  57. void start()
  58. {
  59. thread = std::thread(&ThreadWorker::workingFunc, this);
  60. }
  61. /// Thread loop
  62. void workingFunc();
  63. };
  64. /// Parallel job dispatcher.You feed it with jobs and sends them for
  65. /// execution in parallel and then waits for all to finish
  66. class ThreadPool
  67. {
  68. public:
  69. static constexpr U MAX_THREADS = 32; ///< An absolute limit
  70. /// Default constructor
  71. ThreadPool()
  72. {}
  73. /// Constructor #2
  74. ThreadPool(uint threadsNum)
  75. {
  76. init(threadsNum);
  77. }
  78. /// Init the manager
  79. void init(U threadsNum);
  80. /// Assign a job to a working thread
  81. void assignNewJob(U jobId, ThreadJob* job)
  82. {
  83. jobs[jobId]->assignNewJob(job);
  84. }
  85. /// Wait for all jobs to finish
  86. void waitForAllJobsToFinish()
  87. {
  88. barrier->wait();
  89. }
  90. uint getThreadsCount() const
  91. {
  92. return jobs.size();
  93. }
  94. private:
  95. PtrVector<ThreadWorker> jobs; ///< Worker threads
  96. std::unique_ptr<Barrier> barrier; ///< Synchronization barrier
  97. };
  98. /// Singleton
  99. typedef Singleton<ThreadPool> ThreadPoolSingleton;
  100. } // end namespace anki
  101. #endif