2
0

JobSystemThreadPool.h 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. // SPDX-FileCopyrightText: 2021 Jorrit Rouwe
  2. // SPDX-License-Identifier: MIT
  3. #pragma once
  4. #include <Jolt/Core/JobSystem.h>
  5. #include <Jolt/Core/FixedSizeFreeList.h>
  6. JPH_SUPPRESS_WARNINGS_STD_BEGIN
  7. #include <thread>
  8. #include <mutex>
  9. #include <condition_variable>
  10. JPH_SUPPRESS_WARNINGS_STD_END
  11. JPH_NAMESPACE_BEGIN
  12. // Things we're using from STL
  13. using std::atomic;
  14. using std::thread;
  15. using std::condition_variable;
  16. /// Implementation of a JobSystem using a thread pool
  17. ///
  18. /// Note that this is considered an example implementation. It is expected that when you integrate
  19. /// the physics engine into your own project that you'll provide your own implementation of the
  20. /// JobSystem built on top of whatever job system your project uses.
  21. class JobSystemThreadPool final : public JobSystem
  22. {
  23. public:
  24. JPH_OVERRIDE_NEW_DELETE
  25. /// Creates a thread pool.
  26. /// @see JobSystemThreadPool::Init
  27. JobSystemThreadPool(uint inMaxJobs, uint inMaxBarriers, int inNumThreads = -1);
  28. JobSystemThreadPool() = default;
  29. virtual ~JobSystemThreadPool() override;
  30. /// Initialize the thread pool
  31. /// @param inMaxJobs Max number of jobs that can be allocated at any time
  32. /// @param inMaxBarriers Max number of barriers that can be allocated at any time
  33. /// @param inNumThreads Number of threads to start (the number of concurrent jobs is 1 more because the main thread will also run jobs while waiting for a barrier to complete). Use -1 to autodetect the amount of CPU's.
  34. void Init(uint inMaxJobs, uint inMaxBarriers, int inNumThreads = -1);
  35. // See JobSystem
  36. virtual int GetMaxConcurrency() const override { return int(mThreads.size()) + 1; }
  37. virtual JobHandle CreateJob(const char *inName, ColorArg inColor, const JobFunction &inJobFunction, uint32 inNumDependencies = 0) override;
  38. virtual Barrier * CreateBarrier() override;
  39. virtual void DestroyBarrier(Barrier *inBarrier) override;
  40. virtual void WaitForJobs(Barrier *inBarrier) override;
  41. /// Change the max concurrency after initialization
  42. void SetNumThreads(int inNumThreads) { StopThreads(); StartThreads(inNumThreads); }
  43. protected:
  44. // See JobSystem
  45. virtual void QueueJob(Job *inJob) override;
  46. virtual void QueueJobs(Job **inJobs, uint inNumJobs) override;
  47. virtual void FreeJob(Job *inJob) override;
  48. private:
  49. /// When we switch to C++20 we can use counting_semaphore to unify this
  50. class Semaphore
  51. {
  52. public:
  53. /// Constructor
  54. inline Semaphore();
  55. inline ~Semaphore();
  56. /// Release the semaphore, signalling the thread waiting on the barrier that there may be work
  57. inline void Release(uint inNumber = 1);
  58. /// Acquire the semaphore inNumber times
  59. inline void Acquire(uint inNumber = 1);
  60. /// Get the current value of the semaphore
  61. inline int GetValue() const { return mCount; }
  62. private:
  63. #ifdef JPH_PLATFORM_WINDOWS
  64. // On windows we use a semaphore object since it is more efficient than a lock and a condition variable
  65. alignas(JPH_CACHE_LINE_SIZE) atomic<int> mCount { 0 }; ///< We increment mCount for every release, to acquire we decrement the count. If the count is negative we know that we are waiting on the actual semaphore.
  66. void * mSemaphore; ///< The semaphore is an expensive construct so we only acquire/release it if we know that we need to wait/have waiting threads
  67. #else
  68. // Other platforms: Emulate a semaphore using a mutex, condition variable and count
  69. mutex mLock;
  70. condition_variable mWaitVariable;
  71. int mCount = 0;
  72. #endif
  73. };
  74. class BarrierImpl : public Barrier
  75. {
  76. public:
  77. JPH_OVERRIDE_NEW_DELETE
  78. /// Constructor
  79. BarrierImpl();
  80. virtual ~BarrierImpl() override;
  81. // See Barrier
  82. virtual void AddJob(const JobHandle &inJob) override;
  83. virtual void AddJobs(const JobHandle *inHandles, uint inNumHandles) override;
  84. /// Check if there are any jobs in the job barrier
  85. inline bool IsEmpty() const { return mJobReadIndex == mJobWriteIndex; }
  86. /// Wait for all jobs in this job barrier, while waiting, execute jobs that are part of this barrier on the current thread
  87. void Wait();
  88. /// Flag to indicate if a barrier has been handed out
  89. atomic<bool> mInUse { false };
  90. protected:
  91. /// Called by a Job to mark that it is finished
  92. virtual void OnJobFinished(Job *inJob) override;
  93. /// Jobs queue for the barrier
  94. static constexpr uint cMaxJobs = 2048;
  95. static_assert(IsPowerOf2(cMaxJobs)); // We do bit operations and require max jobs to be a power of 2
  96. atomic<Job *> mJobs[cMaxJobs]; ///< List of jobs that are part of this barrier, nullptrs for empty slots
  97. alignas(JPH_CACHE_LINE_SIZE) atomic<uint> mJobReadIndex { 0 }; ///< First job that could be valid (modulo cMaxJobs), can be nullptr if other thread is still working on adding the job
  98. alignas(JPH_CACHE_LINE_SIZE) atomic<uint> mJobWriteIndex { 0 }; ///< First job that can be written (modulo cMaxJobs)
  99. atomic<int> mNumToAcquire { 0 }; ///< Number of times the semaphore has been released, the barrier should acquire the semaphore this many times (written at the same time as mJobWriteIndex so ok to put in same cache line)
  100. Semaphore mSemaphore; ///< Semaphore used by finishing jobs to signal the barrier that they're done
  101. };
  102. /// Start/stop the worker threads
  103. void StartThreads(int inNumThreads);
  104. void StopThreads();
  105. /// Entry point for a thread
  106. void ThreadMain(int inThreadIndex);
  107. /// Get the head of the thread that has processed the least amount of jobs
  108. inline uint GetHead() const;
  109. /// Internal helper function to queue a job
  110. inline void QueueJobInternal(Job *inJob);
  111. /// Array of jobs (fixed size)
  112. using AvailableJobs = FixedSizeFreeList<Job>;
  113. AvailableJobs mJobs;
  114. /// Array of barriers (we keep them constructed all the time since constructing a semaphore/mutex is not cheap)
  115. uint mMaxBarriers = 0; ///< Max amount of barriers
  116. BarrierImpl * mBarriers = nullptr; ///< List of the actual barriers
  117. /// Threads running jobs
  118. Array<thread> mThreads;
  119. // The job queue
  120. static constexpr uint32 cQueueLength = 1024;
  121. static_assert(IsPowerOf2(cQueueLength)); // We do bit operations and require queue length to be a power of 2
  122. atomic<Job *> mQueue[cQueueLength];
  123. // Head and tail of the queue, do this value modulo cQueueLength - 1 to get the element in the mQueue array
  124. atomic<uint> * mHeads = nullptr; ///< Per executing thread the head of the current queue
  125. alignas(JPH_CACHE_LINE_SIZE) atomic<uint> mTail = 0; ///< Tail (write end) of the queue
  126. // Semaphore used to signal worker threads that there is new work
  127. Semaphore mSemaphore;
  128. /// Boolean to indicate that we want to stop the job system
  129. atomic<bool> mQuit = false;
  130. };
  131. JPH_NAMESPACE_END