JobSystemThreadPool.cpp 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. // Jolt Physics Library (https://github.com/jrouwe/JoltPhysics)
  2. // SPDX-FileCopyrightText: 2021 Jorrit Rouwe
  3. // SPDX-License-Identifier: MIT
  4. #include <Jolt/Jolt.h>
  5. #include <Jolt/Core/JobSystemThreadPool.h>
  6. #include <Jolt/Core/Profiler.h>
  7. #include <Jolt/Core/FPException.h>
  8. #ifdef JPH_PLATFORM_WINDOWS
  9. JPH_SUPPRESS_WARNING_PUSH
  10. JPH_MSVC_SUPPRESS_WARNING(5039) // winbase.h(13179): warning C5039: 'TpSetCallbackCleanupGroup': pointer or reference to potentially throwing function passed to 'extern "C"' function under -EHc. Undefined behavior may occur if this function throws an exception.
  11. #define WIN32_LEAN_AND_MEAN
  12. #ifndef JPH_COMPILER_MINGW
  13. #include <Windows.h>
  14. #else
  15. #include <windows.h>
  16. #endif
  17. JPH_SUPPRESS_WARNING_POP
  18. #endif
  19. JPH_NAMESPACE_BEGIN
  20. void JobSystemThreadPool::Init(uint inMaxJobs, uint inMaxBarriers, int inNumThreads)
  21. {
  22. JobSystemWithBarrier::Init(inMaxBarriers);
  23. // Init freelist of jobs
  24. mJobs.Init(inMaxJobs, inMaxJobs);
  25. // Init queue
  26. for (atomic<Job *> &j : mQueue)
  27. j = nullptr;
  28. // Start the worker threads
  29. StartThreads(inNumThreads);
  30. }
  31. JobSystemThreadPool::JobSystemThreadPool(uint inMaxJobs, uint inMaxBarriers, int inNumThreads)
  32. {
  33. Init(inMaxJobs, inMaxBarriers, inNumThreads);
  34. }
  35. void JobSystemThreadPool::StartThreads(int inNumThreads)
  36. {
  37. // Auto detect number of threads
  38. if (inNumThreads < 0)
  39. inNumThreads = thread::hardware_concurrency() - 1;
  40. // If no threads are requested we're done
  41. if (inNumThreads == 0)
  42. return;
  43. // Don't quit the threads
  44. mQuit = false;
  45. // Allocate heads
  46. mHeads = reinterpret_cast<atomic<uint> *>(Allocate(sizeof(atomic<uint>) * inNumThreads));
  47. for (int i = 0; i < inNumThreads; ++i)
  48. mHeads[i] = 0;
  49. // Start running threads
  50. JPH_ASSERT(mThreads.empty());
  51. mThreads.reserve(inNumThreads);
  52. for (int i = 0; i < inNumThreads; ++i)
  53. mThreads.emplace_back([this, i] { ThreadMain(i); });
  54. }
  55. JobSystemThreadPool::~JobSystemThreadPool()
  56. {
  57. // Stop all worker threads
  58. StopThreads();
  59. }
  60. void JobSystemThreadPool::StopThreads()
  61. {
  62. if (mThreads.empty())
  63. return;
  64. // Signal threads that we want to stop and wake them up
  65. mQuit = true;
  66. mSemaphore.Release((uint)mThreads.size());
  67. // Wait for all threads to finish
  68. for (thread &t : mThreads)
  69. if (t.joinable())
  70. t.join();
  71. // Delete all threads
  72. mThreads.clear();
  73. // Ensure that there are no lingering jobs in the queue
  74. for (uint head = 0; head != mTail; ++head)
  75. {
  76. // Fetch job
  77. Job *job_ptr = mQueue[head & (cQueueLength - 1)].exchange(nullptr);
  78. if (job_ptr != nullptr)
  79. {
  80. // And execute it
  81. job_ptr->Execute();
  82. job_ptr->Release();
  83. }
  84. }
  85. // Destroy heads and reset tail
  86. Free(mHeads);
  87. mHeads = nullptr;
  88. mTail = 0;
  89. }
  90. JobHandle JobSystemThreadPool::CreateJob(const char *inJobName, ColorArg inColor, const JobFunction &inJobFunction, uint32 inNumDependencies)
  91. {
  92. JPH_PROFILE_FUNCTION();
  93. // Loop until we can get a job from the free list
  94. uint32 index;
  95. for (;;)
  96. {
  97. index = mJobs.ConstructObject(inJobName, inColor, this, inJobFunction, inNumDependencies);
  98. if (index != AvailableJobs::cInvalidObjectIndex)
  99. break;
  100. JPH_ASSERT(false, "No jobs available!");
  101. std::this_thread::sleep_for(std::chrono::microseconds(100));
  102. }
  103. Job *job = &mJobs.Get(index);
  104. // Construct handle to keep a reference, the job is queued below and may immediately complete
  105. JobHandle handle(job);
  106. // If there are no dependencies, queue the job now
  107. if (inNumDependencies == 0)
  108. QueueJob(job);
  109. // Return the handle
  110. return handle;
  111. }
  112. void JobSystemThreadPool::FreeJob(Job *inJob)
  113. {
  114. mJobs.DestructObject(inJob);
  115. }
  116. uint JobSystemThreadPool::GetHead() const
  117. {
  118. // Find the minimal value across all threads
  119. uint head = mTail;
  120. for (size_t i = 0; i < mThreads.size(); ++i)
  121. head = min(head, mHeads[i].load());
  122. return head;
  123. }
  124. void JobSystemThreadPool::QueueJobInternal(Job *inJob)
  125. {
  126. // Add reference to job because we're adding the job to the queue
  127. inJob->AddRef();
  128. // Need to read head first because otherwise the tail can already have passed the head
  129. // We read the head outside of the loop since it involves iterating over all threads and we only need to update
  130. // it if there's not enough space in the queue.
  131. uint head = GetHead();
  132. for (;;)
  133. {
  134. // Check if there's space in the queue
  135. uint old_value = mTail;
  136. if (old_value - head >= cQueueLength)
  137. {
  138. // We calculated the head outside of the loop, update head (and we also need to update tail to prevent it from passing head)
  139. head = GetHead();
  140. old_value = mTail;
  141. // Second check if there's space in the queue
  142. if (old_value - head >= cQueueLength)
  143. {
  144. // Wake up all threads in order to ensure that they can clear any nullptrs they may not have processed yet
  145. mSemaphore.Release((uint)mThreads.size());
  146. // Sleep a little (we have to wait for other threads to update their head pointer in order for us to be able to continue)
  147. std::this_thread::sleep_for(std::chrono::microseconds(100));
  148. continue;
  149. }
  150. }
  151. // Write the job pointer if the slot is empty
  152. Job *expected_job = nullptr;
  153. bool success = mQueue[old_value & (cQueueLength - 1)].compare_exchange_strong(expected_job, inJob);
  154. // Regardless of who wrote the slot, we will update the tail (if the successful thread got scheduled out
  155. // after writing the pointer we still want to be able to continue)
  156. mTail.compare_exchange_strong(old_value, old_value + 1);
  157. // If we successfully added our job we're done
  158. if (success)
  159. break;
  160. }
  161. }
  162. void JobSystemThreadPool::QueueJob(Job *inJob)
  163. {
  164. JPH_PROFILE_FUNCTION();
  165. // If we have no worker threads, we can't queue the job either. We assume in this case that the job will be added to a barrier and that the barrier will execute the job when it's Wait() function is called.
  166. if (mThreads.empty())
  167. return;
  168. // Queue the job
  169. QueueJobInternal(inJob);
  170. // Wake up thread
  171. mSemaphore.Release();
  172. }
  173. void JobSystemThreadPool::QueueJobs(Job **inJobs, uint inNumJobs)
  174. {
  175. JPH_PROFILE_FUNCTION();
  176. JPH_ASSERT(inNumJobs > 0);
  177. // If we have no worker threads, we can't queue the job either. We assume in this case that the job will be added to a barrier and that the barrier will execute the job when it's Wait() function is called.
  178. if (mThreads.empty())
  179. return;
  180. // Queue all jobs
  181. for (Job **job = inJobs, **job_end = inJobs + inNumJobs; job < job_end; ++job)
  182. QueueJobInternal(*job);
  183. // Wake up threads
  184. mSemaphore.Release(min(inNumJobs, (uint)mThreads.size()));
  185. }
  186. #if defined(JPH_PLATFORM_WINDOWS) && !defined(JPH_COMPILER_MINGW) // MinGW doesn't support __try/__except
  187. // Sets the current thread name in MSVC debugger
  188. static void SetThreadName(const char *inName)
  189. {
  190. #pragma pack(push, 8)
  191. struct THREADNAME_INFO
  192. {
  193. DWORD dwType; // Must be 0x1000.
  194. LPCSTR szName; // Pointer to name (in user addr space).
  195. DWORD dwThreadID; // Thread ID (-1=caller thread).
  196. DWORD dwFlags; // Reserved for future use, must be zero.
  197. };
  198. #pragma pack(pop)
  199. THREADNAME_INFO info;
  200. info.dwType = 0x1000;
  201. info.szName = inName;
  202. info.dwThreadID = (DWORD)-1;
  203. info.dwFlags = 0;
  204. __try
  205. {
  206. RaiseException(0x406D1388, 0, sizeof(info) / sizeof(ULONG_PTR), (ULONG_PTR *)&info);
  207. }
  208. __except(EXCEPTION_EXECUTE_HANDLER)
  209. {
  210. }
  211. }
  212. #endif // JPH_PLATFORM_WINDOWS && !JPH_COMPILER_MINGW
  213. void JobSystemThreadPool::ThreadMain(int inThreadIndex)
  214. {
  215. // Name the thread
  216. char name[64];
  217. snprintf(name, sizeof(name), "Worker %d", int(inThreadIndex + 1));
  218. #if defined(JPH_PLATFORM_WINDOWS) && !defined(JPH_COMPILER_MINGW)
  219. SetThreadName(name);
  220. #endif // JPH_PLATFORM_WINDOWS && !JPH_COMPILER_MINGW
  221. // Enable floating point exceptions
  222. FPExceptionsEnable enable_exceptions;
  223. JPH_UNUSED(enable_exceptions);
  224. JPH_PROFILE_THREAD_START(name);
  225. atomic<uint> &head = mHeads[inThreadIndex];
  226. while (!mQuit)
  227. {
  228. // Wait for jobs
  229. mSemaphore.Acquire();
  230. {
  231. JPH_PROFILE("Executing Jobs");
  232. // Loop over the queue
  233. while (head != mTail)
  234. {
  235. // Exchange any job pointer we find with a nullptr
  236. atomic<Job *> &job = mQueue[head & (cQueueLength - 1)];
  237. if (job.load() != nullptr)
  238. {
  239. Job *job_ptr = job.exchange(nullptr);
  240. if (job_ptr != nullptr)
  241. {
  242. // And execute it
  243. job_ptr->Execute();
  244. job_ptr->Release();
  245. }
  246. }
  247. head++;
  248. }
  249. }
  250. }
  251. JPH_PROFILE_THREAD_END();
  252. }
  253. JPH_NAMESPACE_END