JobSystem.h 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. // Jolt Physics Library (https://github.com/jrouwe/JoltPhysics)
  2. // SPDX-FileCopyrightText: 2021 Jorrit Rouwe
  3. // SPDX-License-Identifier: MIT
  4. #pragma once
  5. #include <Jolt/Core/Reference.h>
  6. #include <Jolt/Core/Color.h>
  7. #include <Jolt/Core/Profiler.h>
  8. #include <Jolt/Core/NonCopyable.h>
  9. #include <Jolt/Core/StaticArray.h>
  10. #include <Jolt/Core/Atomics.h>
  11. JPH_NAMESPACE_BEGIN
  12. /// A class that allows units of work (Jobs) to be scheduled across multiple threads.
  13. /// It allows dependencies between the jobs so that the jobs form a graph.
  14. ///
  15. /// The pattern for using this class is:
  16. ///
  17. /// // Create job system
  18. /// JobSystem *job_system = new JobSystemThreadPool(...);
  19. ///
  20. /// // Create some jobs
  21. /// JobHandle second_job = job_system->CreateJob("SecondJob", Color::sRed, []() { ... }, 1); // Create a job with 1 dependency
  22. /// JobHandle first_job = job_system->CreateJob("FirstJob", Color::sGreen, [second_job]() { ....; second_job.RemoveDependency(); }, 0); // Job can start immediately, will start second job when it's done
  23. /// JobHandle third_job = job_system->CreateJob("ThirdJob", Color::sBlue, []() { ... }, 0); // This job can run immediately as well and can run in parallel to job 1 and 2
  24. ///
  25. /// // Add the jobs to the barrier so that we can execute them while we're waiting
  26. /// Barrier *barrier = job_system->CreateBarrier();
  27. /// barrier->AddJob(first_job);
  28. /// barrier->AddJob(second_job);
  29. /// barrier->AddJob(third_job);
  30. /// job_system->WaitForJobs(barrier);
  31. ///
  32. /// // Clean up
  33. /// job_system->DestroyBarrier(barrier);
  34. /// delete job_system;
  35. ///
  36. /// Jobs are guaranteed to be started in the order that their dependency counter becomes zero (in case they're scheduled on a background thread)
  37. /// or in the order they're added to the barrier (when dependency count is zero and when executing on the thread that calls WaitForJobs).
  38. class JobSystem : public NonCopyable
  39. {
  40. protected:
  41. class Job;
  42. public:
  43. JPH_OVERRIDE_NEW_DELETE
  44. /// A job handle contains a reference to a job. The job will be deleted as soon as there are no JobHandles.
  45. /// referring to the job and when it is not in the job queue / being processed.
  46. class JobHandle : private Ref<Job>
  47. {
  48. public:
  49. /// Constructor
  50. inline JobHandle() = default;
  51. inline JobHandle(const JobHandle &inHandle) = default;
  52. inline JobHandle(JobHandle &&inHandle) noexcept : Ref<Job>(std::move(inHandle)) { }
  53. /// Constructor, only to be used by JobSystem
  54. inline explicit JobHandle(Job *inJob) : Ref<Job>(inJob) { }
  55. /// Assignment
  56. inline JobHandle & operator = (const JobHandle &inHandle) { Ref<Job>::operator = (inHandle); return *this; }
  57. inline JobHandle & operator = (JobHandle &&inHandle) noexcept { Ref<Job>::operator = (std::move(inHandle)); return *this; }
  58. /// Check if this handle contains a job
  59. inline bool IsValid() const { return GetPtr() != nullptr; }
  60. /// Check if this job has finished executing
  61. inline bool IsDone() const { return GetPtr() != nullptr && GetPtr()->IsDone(); }
  62. /// Add to the dependency counter.
  63. inline void AddDependency(int inCount = 1) const { GetPtr()->AddDependency(inCount); }
  64. /// Remove from the dependency counter. Job will start whenever the dependency counter reaches zero
  65. /// and if it does it is no longer valid to call the AddDependency/RemoveDependency functions.
  66. inline void RemoveDependency(int inCount = 1) const { GetPtr()->RemoveDependencyAndQueue(inCount); }
  67. /// Remove a dependency from a batch of jobs at once, this can be more efficient than removing them one by one as it requires less locking
  68. static inline void sRemoveDependencies(JobHandle *inHandles, uint inNumHandles, int inCount = 1);
  69. /// Helper function to remove dependencies on a static array of job handles
  70. template <uint N>
  71. static inline void sRemoveDependencies(StaticArray<JobHandle, N> &inHandles, int inCount = 1)
  72. {
  73. sRemoveDependencies(inHandles.data(), inHandles.size(), inCount);
  74. }
  75. /// Inherit the GetPtr function, only to be used by the JobSystem
  76. using Ref<Job>::GetPtr;
  77. };
  78. /// A job barrier keeps track of a number of jobs and allows waiting until they are all completed.
  79. class Barrier : public NonCopyable
  80. {
  81. public:
  82. JPH_OVERRIDE_NEW_DELETE
  83. /// Add a job to this barrier
  84. /// Note that jobs can keep being added to the barrier while waiting for the barrier
  85. virtual void AddJob(const JobHandle &inJob) = 0;
  86. /// Add multiple jobs to this barrier
  87. /// Note that jobs can keep being added to the barrier while waiting for the barrier
  88. virtual void AddJobs(const JobHandle *inHandles, uint inNumHandles) = 0;
  89. protected:
  90. /// Job needs to be able to call OnJobFinished
  91. friend class Job;
  92. /// Destructor, you should call JobSystem::DestroyBarrier instead of destructing this object directly
  93. virtual ~Barrier() = default;
  94. /// Called by a Job to mark that it is finished
  95. virtual void OnJobFinished(Job *inJob) = 0;
  96. };
  97. /// Main function of the job
  98. using JobFunction = function<void()>;
  99. /// Destructor
  100. virtual ~JobSystem() = default;
  101. /// Get maximum number of concurrently executing jobs
  102. virtual int GetMaxConcurrency() const = 0;
  103. /// Create a new job, the job is started immediately if inNumDependencies == 0 otherwise it starts when
  104. /// RemoveDependency causes the dependency counter to reach 0.
  105. virtual JobHandle CreateJob(const char *inName, ColorArg inColor, const JobFunction &inJobFunction, uint32 inNumDependencies = 0) = 0;
  106. /// Create a new barrier, used to wait on jobs
  107. virtual Barrier * CreateBarrier() = 0;
  108. /// Destroy a barrier when it is no longer used. The barrier should be empty at this point.
  109. virtual void DestroyBarrier(Barrier *inBarrier) = 0;
  110. /// Wait for a set of jobs to be finished, note that only 1 thread can be waiting on a barrier at a time
  111. virtual void WaitForJobs(Barrier *inBarrier) = 0;
  112. protected:
  113. /// A class that contains information for a single unit of work
  114. class Job
  115. {
  116. public:
  117. JPH_OVERRIDE_NEW_DELETE
  118. /// Constructor
  119. Job([[maybe_unused]] const char *inJobName, [[maybe_unused]] ColorArg inColor, JobSystem *inJobSystem, const JobFunction &inJobFunction, uint32 inNumDependencies) :
  120. #if defined(JPH_EXTERNAL_PROFILE) || defined(JPH_PROFILE_ENABLED)
  121. mJobName(inJobName),
  122. mColor(inColor),
  123. #endif // defined(JPH_EXTERNAL_PROFILE) || defined(JPH_PROFILE_ENABLED)
  124. mJobSystem(inJobSystem),
  125. mJobFunction(inJobFunction),
  126. mNumDependencies(inNumDependencies)
  127. {
  128. }
  129. /// Get the jobs system to which this job belongs
  130. inline JobSystem * GetJobSystem() { return mJobSystem; }
  131. /// Add or release a reference to this object
  132. inline void AddRef()
  133. {
  134. // Adding a reference can use relaxed memory ordering
  135. mReferenceCount.fetch_add(1, memory_order_relaxed);
  136. }
  137. inline void Release()
  138. {
  139. // Releasing a reference must use release semantics...
  140. if (mReferenceCount.fetch_sub(1, memory_order_release) == 1)
  141. {
  142. // ... so that we can use aquire to ensure that we see any updates from other threads that released a ref before freeing the job
  143. atomic_thread_fence(memory_order_acquire);
  144. mJobSystem->FreeJob(this);
  145. }
  146. }
  147. /// Add to the dependency counter.
  148. inline void AddDependency(int inCount);
  149. /// Remove from the dependency counter. Returns true whenever the dependency counter reaches zero
  150. /// and if it does it is no longer valid to call the AddDependency/RemoveDependency functions.
  151. inline bool RemoveDependency(int inCount);
  152. /// Remove from the dependency counter. Job will be queued whenever the dependency counter reaches zero
  153. /// and if it does it is no longer valid to call the AddDependency/RemoveDependency functions.
  154. inline void RemoveDependencyAndQueue(int inCount);
  155. /// Set the job barrier that this job belongs to and returns false if this was not possible because the job already finished
  156. inline bool SetBarrier(Barrier *inBarrier)
  157. {
  158. intptr_t barrier = 0;
  159. if (mBarrier.compare_exchange_strong(barrier, reinterpret_cast<intptr_t>(inBarrier), memory_order_relaxed))
  160. return true;
  161. JPH_ASSERT(barrier == cBarrierDoneState, "A job can only belong to 1 barrier");
  162. return false;
  163. }
  164. /// Run the job function, returns the number of dependencies that this job still has or cExecutingState or cDoneState
  165. inline uint32 Execute()
  166. {
  167. // Transition job to executing state
  168. uint32 state = 0; // We can only start running with a dependency counter of 0
  169. if (!mNumDependencies.compare_exchange_strong(state, cExecutingState, memory_order_acquire))
  170. return state; // state is updated by compare_exchange_strong to the current value
  171. // Run the job function
  172. {
  173. JPH_PROFILE(mJobName, mColor.GetUInt32());
  174. mJobFunction();
  175. }
  176. // Fetch the barrier pointer and exchange it for the done state, so we're sure that no barrier gets set after we want to call the callback
  177. intptr_t barrier = mBarrier.load(memory_order_relaxed);
  178. for (;;)
  179. {
  180. if (mBarrier.compare_exchange_weak(barrier, cBarrierDoneState, memory_order_relaxed))
  181. break;
  182. }
  183. JPH_ASSERT(barrier != cBarrierDoneState);
  184. // Mark job as done
  185. state = cExecutingState;
  186. mNumDependencies.compare_exchange_strong(state, cDoneState, memory_order_relaxed);
  187. JPH_ASSERT(state == cExecutingState);
  188. // Notify the barrier after we've changed the job to the done state so that any thread reading the state after receiving the callback will see that the job has finished
  189. if (barrier != 0)
  190. reinterpret_cast<Barrier *>(barrier)->OnJobFinished(this);
  191. return cDoneState;
  192. }
  193. /// Test if the job can be executed
  194. inline bool CanBeExecuted() const { return mNumDependencies.load(memory_order_relaxed) == 0; }
  195. /// Test if the job finished executing
  196. inline bool IsDone() const { return mNumDependencies.load(memory_order_relaxed) == cDoneState; }
  197. static constexpr uint32 cExecutingState = 0xe0e0e0e0; ///< Value of mNumDependencies when job is executing
  198. static constexpr uint32 cDoneState = 0xd0d0d0d0; ///< Value of mNumDependencies when job is done executing
  199. static constexpr intptr_t cBarrierDoneState = ~intptr_t(0); ///< Value to use when the barrier has been triggered
  200. private:
  201. #if defined(JPH_EXTERNAL_PROFILE) || defined(JPH_PROFILE_ENABLED)
  202. const char * mJobName; ///< Name of the job
  203. Color mColor; ///< Color of the job in the profiler
  204. #endif // defined(JPH_EXTERNAL_PROFILE) || defined(JPH_PROFILE_ENABLED)
  205. JobSystem * mJobSystem; ///< The job system we belong to
  206. atomic<intptr_t> mBarrier = 0; ///< Barrier that this job is associated with (is a Barrier pointer)
  207. JobFunction mJobFunction; ///< Main job function
  208. atomic<uint32> mReferenceCount = 0; ///< Amount of JobHandles pointing to this job
  209. atomic<uint32> mNumDependencies; ///< Amount of jobs that need to complete before this job can run
  210. };
  211. /// Adds a job to the job queue
  212. virtual void QueueJob(Job *inJob) = 0;
  213. /// Adds a number of jobs at once to the job queue
  214. virtual void QueueJobs(Job **inJobs, uint inNumJobs) = 0;
  215. /// Frees a job
  216. virtual void FreeJob(Job *inJob) = 0;
  217. };
  218. using JobHandle = JobSystem::JobHandle;
  219. JPH_NAMESPACE_END
  220. #include "JobSystem.inl"