JobSystemWithBarrier.cpp 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. // Jolt Physics Library (https://github.com/jrouwe/JoltPhysics)
  2. // SPDX-FileCopyrightText: 2023 Jorrit Rouwe
  3. // SPDX-License-Identifier: MIT
  4. #include <Jolt/Jolt.h>
  5. #include <Jolt/Core/JobSystemWithBarrier.h>
  6. #include <Jolt/Core/Profiler.h>
  7. JPH_SUPPRESS_WARNINGS_STD_BEGIN
  8. #include <thread>
  9. JPH_SUPPRESS_WARNINGS_STD_END
  10. JPH_NAMESPACE_BEGIN
  11. JobSystemWithBarrier::BarrierImpl::BarrierImpl()
  12. {
  13. for (atomic<Job *> &j : mJobs)
  14. j = nullptr;
  15. }
  16. JobSystemWithBarrier::BarrierImpl::~BarrierImpl()
  17. {
  18. JPH_ASSERT(IsEmpty());
  19. }
  20. void JobSystemWithBarrier::BarrierImpl::AddJob(const JobHandle &inJob)
  21. {
  22. JPH_PROFILE_FUNCTION();
  23. bool release_semaphore = false;
  24. // Set the barrier on the job, this returns true if the barrier was successfully set (otherwise the job is already done and we don't need to add it to our list)
  25. Job *job = inJob.GetPtr();
  26. if (job->SetBarrier(this))
  27. {
  28. // If the job can be executed we want to release the semaphore an extra time to allow the waiting thread to start executing it
  29. mNumToAcquire++;
  30. if (job->CanBeExecuted())
  31. {
  32. release_semaphore = true;
  33. mNumToAcquire++;
  34. }
  35. // Add the job to our job list
  36. job->AddRef();
  37. uint write_index = mJobWriteIndex++;
  38. while (write_index - mJobReadIndex >= cMaxJobs)
  39. {
  40. JPH_ASSERT(false, "Barrier full, stalling!");
  41. std::this_thread::sleep_for(std::chrono::microseconds(100));
  42. }
  43. mJobs[write_index & (cMaxJobs - 1)] = job;
  44. }
  45. // Notify waiting thread that a new executable job is available
  46. if (release_semaphore)
  47. mSemaphore.Release();
  48. }
  49. void JobSystemWithBarrier::BarrierImpl::AddJobs(const JobHandle *inHandles, uint inNumHandles)
  50. {
  51. JPH_PROFILE_FUNCTION();
  52. bool release_semaphore = false;
  53. for (const JobHandle *handle = inHandles, *handles_end = inHandles + inNumHandles; handle < handles_end; ++handle)
  54. {
  55. // Set the barrier on the job, this returns true if the barrier was successfully set (otherwise the job is already done and we don't need to add it to our list)
  56. Job *job = handle->GetPtr();
  57. if (job->SetBarrier(this))
  58. {
  59. // If the job can be executed we want to release the semaphore an extra time to allow the waiting thread to start executing it
  60. mNumToAcquire++;
  61. if (!release_semaphore && job->CanBeExecuted())
  62. {
  63. release_semaphore = true;
  64. mNumToAcquire++;
  65. }
  66. // Add the job to our job list
  67. job->AddRef();
  68. uint write_index = mJobWriteIndex++;
  69. while (write_index - mJobReadIndex >= cMaxJobs)
  70. {
  71. JPH_ASSERT(false, "Barrier full, stalling!");
  72. std::this_thread::sleep_for(std::chrono::microseconds(100));
  73. }
  74. mJobs[write_index & (cMaxJobs - 1)] = job;
  75. }
  76. }
  77. // Notify waiting thread that a new executable job is available
  78. if (release_semaphore)
  79. mSemaphore.Release();
  80. }
  81. void JobSystemWithBarrier::BarrierImpl::OnJobFinished(Job *inJob)
  82. {
  83. JPH_PROFILE_FUNCTION();
  84. mSemaphore.Release();
  85. }
  86. void JobSystemWithBarrier::BarrierImpl::Wait()
  87. {
  88. while (mNumToAcquire > 0)
  89. {
  90. {
  91. JPH_PROFILE("Execute Jobs");
  92. // Go through all jobs
  93. bool has_executed;
  94. do
  95. {
  96. has_executed = false;
  97. // Loop through the jobs and erase jobs from the beginning of the list that are done
  98. while (mJobReadIndex < mJobWriteIndex)
  99. {
  100. atomic<Job *> &job = mJobs[mJobReadIndex & (cMaxJobs - 1)];
  101. Job *job_ptr = job.load();
  102. if (job_ptr == nullptr || !job_ptr->IsDone())
  103. break;
  104. // Job is finished, release it
  105. job_ptr->Release();
  106. job = nullptr;
  107. ++mJobReadIndex;
  108. }
  109. // Loop through the jobs and execute the first executable job
  110. for (uint index = mJobReadIndex; index < mJobWriteIndex; ++index)
  111. {
  112. const atomic<Job *> &job = mJobs[index & (cMaxJobs - 1)];
  113. Job *job_ptr = job.load();
  114. if (job_ptr != nullptr && job_ptr->CanBeExecuted())
  115. {
  116. // This will only execute the job if it has not already executed
  117. job_ptr->Execute();
  118. has_executed = true;
  119. break;
  120. }
  121. }
  122. } while (has_executed);
  123. }
  124. // Wait for another thread to wake us when either there is more work to do or when all jobs have completed
  125. int num_to_acquire = max(1, mSemaphore.GetValue()); // When there have been multiple releases, we acquire them all at the same time to avoid needlessly spinning on executing jobs
  126. mSemaphore.Acquire(num_to_acquire);
  127. mNumToAcquire -= num_to_acquire;
  128. }
  129. // All jobs should be done now, release them
  130. while (mJobReadIndex < mJobWriteIndex)
  131. {
  132. atomic<Job *> &job = mJobs[mJobReadIndex & (cMaxJobs - 1)];
  133. Job *job_ptr = job.load();
  134. JPH_ASSERT(job_ptr != nullptr && job_ptr->IsDone());
  135. job_ptr->Release();
  136. job = nullptr;
  137. ++mJobReadIndex;
  138. }
  139. }
  140. void JobSystemWithBarrier::Init(uint inMaxBarriers)
  141. {
  142. JPH_ASSERT(mBarriers == nullptr); // Already initialized?
  143. // Init freelist of barriers
  144. mMaxBarriers = inMaxBarriers;
  145. mBarriers = new BarrierImpl [inMaxBarriers];
  146. }
  147. JobSystemWithBarrier::JobSystemWithBarrier(uint inMaxBarriers)
  148. {
  149. Init(inMaxBarriers);
  150. }
  151. JobSystemWithBarrier::~JobSystemWithBarrier()
  152. {
  153. // Ensure that none of the barriers are used
  154. #ifdef JPH_ENABLE_ASSERTS
  155. for (const BarrierImpl *b = mBarriers, *b_end = mBarriers + mMaxBarriers; b < b_end; ++b)
  156. JPH_ASSERT(!b->mInUse);
  157. #endif // JPH_ENABLE_ASSERTS
  158. delete [] mBarriers;
  159. }
  160. JobSystem::Barrier *JobSystemWithBarrier::CreateBarrier()
  161. {
  162. JPH_PROFILE_FUNCTION();
  163. // Find the first unused barrier
  164. for (uint32 index = 0; index < mMaxBarriers; ++index)
  165. {
  166. bool expected = false;
  167. if (mBarriers[index].mInUse.compare_exchange_strong(expected, true))
  168. return &mBarriers[index];
  169. }
  170. return nullptr;
  171. }
  172. void JobSystemWithBarrier::DestroyBarrier(Barrier *inBarrier)
  173. {
  174. JPH_PROFILE_FUNCTION();
  175. // Check that no jobs are in the barrier
  176. JPH_ASSERT(static_cast<BarrierImpl *>(inBarrier)->IsEmpty());
  177. // Flag the barrier as unused
  178. bool expected = true;
  179. static_cast<BarrierImpl *>(inBarrier)->mInUse.compare_exchange_strong(expected, false);
  180. JPH_ASSERT(expected);
  181. }
  182. void JobSystemWithBarrier::WaitForJobs(Barrier *inBarrier)
  183. {
  184. JPH_PROFILE_FUNCTION();
  185. // Let our barrier implementation wait for the jobs
  186. static_cast<BarrierImpl *>(inBarrier)->Wait();
  187. }
  188. JPH_NAMESPACE_END