JobSystemThreadPool.cpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570
  1. // Jolt Physics Library (https://github.com/jrouwe/JoltPhysics)
  2. // SPDX-FileCopyrightText: 2021 Jorrit Rouwe
  3. // SPDX-License-Identifier: MIT
  4. #include <Jolt/Jolt.h>
  5. #include <Jolt/Core/JobSystemThreadPool.h>
  6. #include <Jolt/Core/Profiler.h>
  7. #include <Jolt/Core/FPException.h>
  8. #ifdef JPH_PLATFORM_WINDOWS
  9. JPH_SUPPRESS_WARNING_PUSH
  10. JPH_MSVC_SUPPRESS_WARNING(5039) // winbase.h(13179): warning C5039: 'TpSetCallbackCleanupGroup': pointer or reference to potentially throwing function passed to 'extern "C"' function under -EHc. Undefined behavior may occur if this function throws an exception.
  11. #define WIN32_LEAN_AND_MEAN
  12. #ifndef JPH_COMPILER_MINGW
  13. #include <Windows.h>
  14. #else
  15. #include <windows.h>
  16. #endif
  17. JPH_SUPPRESS_WARNING_POP
  18. #endif
  19. JPH_NAMESPACE_BEGIN
  20. JobSystemThreadPool::Semaphore::Semaphore()
  21. {
  22. #ifdef JPH_PLATFORM_WINDOWS
  23. mSemaphore = CreateSemaphore(nullptr, 0, INT_MAX, nullptr);
  24. #endif
  25. }
  26. JobSystemThreadPool::Semaphore::~Semaphore()
  27. {
  28. #ifdef JPH_PLATFORM_WINDOWS
  29. CloseHandle(mSemaphore);
  30. #endif
  31. }
  32. void JobSystemThreadPool::Semaphore::Release(uint inNumber)
  33. {
  34. JPH_ASSERT(inNumber > 0);
  35. #ifdef JPH_PLATFORM_WINDOWS
  36. int old_value = mCount.fetch_add(inNumber);
  37. if (old_value < 0)
  38. {
  39. int new_value = old_value + (int)inNumber;
  40. int num_to_release = min(new_value, 0) - old_value;
  41. ::ReleaseSemaphore(mSemaphore, num_to_release, nullptr);
  42. }
  43. #else
  44. lock_guard lock(mLock);
  45. mCount += (int)inNumber;
  46. if (inNumber > 1)
  47. mWaitVariable.notify_all();
  48. else
  49. mWaitVariable.notify_one();
  50. #endif
  51. }
  52. void JobSystemThreadPool::Semaphore::Acquire(uint inNumber)
  53. {
  54. JPH_ASSERT(inNumber > 0);
  55. #ifdef JPH_PLATFORM_WINDOWS
  56. int old_value = mCount.fetch_sub(inNumber);
  57. int new_value = old_value - (int)inNumber;
  58. if (new_value < 0)
  59. {
  60. int num_to_acquire = min(old_value, 0) - new_value;
  61. for (int i = 0; i < num_to_acquire; ++i)
  62. WaitForSingleObject(mSemaphore, INFINITE);
  63. }
  64. #else
  65. unique_lock lock(mLock);
  66. mCount -= (int)inNumber;
  67. mWaitVariable.wait(lock, [this]() { return mCount >= 0; });
  68. #endif
  69. }
  70. JobSystemThreadPool::BarrierImpl::BarrierImpl()
  71. {
  72. for (atomic<Job *> &j : mJobs)
  73. j = nullptr;
  74. }
  75. JobSystemThreadPool::BarrierImpl::~BarrierImpl()
  76. {
  77. JPH_ASSERT(IsEmpty());
  78. }
  79. void JobSystemThreadPool::BarrierImpl::AddJob(const JobHandle &inJob)
  80. {
  81. JPH_PROFILE_FUNCTION();
  82. bool release_semaphore = false;
  83. // Set the barrier on the job, this returns true if the barrier was successfully set (otherwise the job is already done and we don't need to add it to our list)
  84. Job *job = inJob.GetPtr();
  85. if (job->SetBarrier(this))
  86. {
  87. // If the job can be executed we want to release the semaphore an extra time to allow the waiting thread to start executing it
  88. mNumToAcquire++;
  89. if (job->CanBeExecuted())
  90. {
  91. release_semaphore = true;
  92. mNumToAcquire++;
  93. }
  94. // Add the job to our job list
  95. job->AddRef();
  96. uint write_index = mJobWriteIndex++;
  97. while (write_index - mJobReadIndex >= cMaxJobs)
  98. {
  99. JPH_ASSERT(false, "Barrier full, stalling!");
  100. std::this_thread::sleep_for(std::chrono::microseconds(100));
  101. }
  102. mJobs[write_index & (cMaxJobs - 1)] = job;
  103. }
  104. // Notify waiting thread that a new executable job is available
  105. if (release_semaphore)
  106. mSemaphore.Release();
  107. }
  108. void JobSystemThreadPool::BarrierImpl::AddJobs(const JobHandle *inHandles, uint inNumHandles)
  109. {
  110. JPH_PROFILE_FUNCTION();
  111. bool release_semaphore = false;
  112. for (const JobHandle *handle = inHandles, *handles_end = inHandles + inNumHandles; handle < handles_end; ++handle)
  113. {
  114. // Set the barrier on the job, this returns true if the barrier was successfully set (otherwise the job is already done and we don't need to add it to our list)
  115. Job *job = handle->GetPtr();
  116. if (job->SetBarrier(this))
  117. {
  118. // If the job can be executed we want to release the semaphore an extra time to allow the waiting thread to start executing it
  119. mNumToAcquire++;
  120. if (!release_semaphore && job->CanBeExecuted())
  121. {
  122. release_semaphore = true;
  123. mNumToAcquire++;
  124. }
  125. // Add the job to our job list
  126. job->AddRef();
  127. uint write_index = mJobWriteIndex++;
  128. while (write_index - mJobReadIndex >= cMaxJobs)
  129. {
  130. JPH_ASSERT(false, "Barrier full, stalling!");
  131. std::this_thread::sleep_for(std::chrono::microseconds(100));
  132. }
  133. mJobs[write_index & (cMaxJobs - 1)] = job;
  134. }
  135. }
  136. // Notify waiting thread that a new executable job is available
  137. if (release_semaphore)
  138. mSemaphore.Release();
  139. }
  140. void JobSystemThreadPool::BarrierImpl::OnJobFinished(Job *inJob)
  141. {
  142. JPH_PROFILE_FUNCTION();
  143. mSemaphore.Release();
  144. }
  145. void JobSystemThreadPool::BarrierImpl::Wait()
  146. {
  147. while (mNumToAcquire > 0)
  148. {
  149. {
  150. JPH_PROFILE("Execute Jobs");
  151. // Go through all jobs
  152. bool has_executed;
  153. do
  154. {
  155. has_executed = false;
  156. // Loop through the jobs and erase jobs from the beginning of the list that are done
  157. while (mJobReadIndex < mJobWriteIndex)
  158. {
  159. atomic<Job *> &job = mJobs[mJobReadIndex & (cMaxJobs - 1)];
  160. Job *job_ptr = job.load();
  161. if (job_ptr == nullptr || !job_ptr->IsDone())
  162. break;
  163. // Job is finished, release it
  164. job_ptr->Release();
  165. job = nullptr;
  166. ++mJobReadIndex;
  167. }
  168. // Loop through the jobs and execute the first executable job
  169. for (uint index = mJobReadIndex; index < mJobWriteIndex; ++index)
  170. {
  171. const atomic<Job *> &job = mJobs[index & (cMaxJobs - 1)];
  172. Job *job_ptr = job.load();
  173. if (job_ptr != nullptr && job_ptr->CanBeExecuted())
  174. {
  175. // This will only execute the job if it has not already executed
  176. job_ptr->Execute();
  177. has_executed = true;
  178. break;
  179. }
  180. }
  181. } while (has_executed);
  182. }
  183. // Wait for another thread to wake us when either there is more work to do or when all jobs have completed
  184. int num_to_acquire = max(1, mSemaphore.GetValue()); // When there have been multiple releases, we acquire them all at the same time to avoid needlessly spinning on executing jobs
  185. mSemaphore.Acquire(num_to_acquire);
  186. mNumToAcquire -= num_to_acquire;
  187. }
  188. // All jobs should be done now, release them
  189. while (mJobReadIndex < mJobWriteIndex)
  190. {
  191. atomic<Job *> &job = mJobs[mJobReadIndex & (cMaxJobs - 1)];
  192. Job *job_ptr = job.load();
  193. JPH_ASSERT(job_ptr != nullptr && job_ptr->IsDone());
  194. job_ptr->Release();
  195. job = nullptr;
  196. ++mJobReadIndex;
  197. }
  198. }
  199. void JobSystemThreadPool::Init(uint inMaxJobs, uint inMaxBarriers, int inNumThreads)
  200. {
  201. JPH_ASSERT(mBarriers == nullptr); // Already initialized?
  202. // Init freelist of barriers
  203. mMaxBarriers = inMaxBarriers;
  204. mBarriers = new BarrierImpl [inMaxBarriers];
  205. // Init freelist of jobs
  206. mJobs.Init(inMaxJobs, inMaxJobs);
  207. // Init queue
  208. for (atomic<Job *> &j : mQueue)
  209. j = nullptr;
  210. // Start the worker threads
  211. StartThreads(inNumThreads);
  212. }
  213. JobSystemThreadPool::JobSystemThreadPool(uint inMaxJobs, uint inMaxBarriers, int inNumThreads)
  214. {
  215. Init(inMaxJobs, inMaxBarriers, inNumThreads);
  216. }
  217. void JobSystemThreadPool::StartThreads(int inNumThreads)
  218. {
  219. // Auto detect number of threads
  220. if (inNumThreads < 0)
  221. inNumThreads = thread::hardware_concurrency() - 1;
  222. // If no threads are requested we're done
  223. if (inNumThreads == 0)
  224. return;
  225. // Don't quit the threads
  226. mQuit = false;
  227. // Allocate heads
  228. mHeads = reinterpret_cast<atomic<uint> *>(Allocate(sizeof(atomic<uint>) * inNumThreads));
  229. for (int i = 0; i < inNumThreads; ++i)
  230. mHeads[i] = 0;
  231. // Start running threads
  232. JPH_ASSERT(mThreads.empty());
  233. mThreads.reserve(inNumThreads);
  234. for (int i = 0; i < inNumThreads; ++i)
  235. mThreads.emplace_back([this, i] { ThreadMain(i); });
  236. }
  237. JobSystemThreadPool::~JobSystemThreadPool()
  238. {
  239. // Stop all worker threads
  240. StopThreads();
  241. // Ensure that none of the barriers are used
  242. #ifdef JPH_ENABLE_ASSERTS
  243. for (const BarrierImpl *b = mBarriers, *b_end = mBarriers + mMaxBarriers; b < b_end; ++b)
  244. JPH_ASSERT(!b->mInUse);
  245. #endif // JPH_ENABLE_ASSERTS
  246. delete [] mBarriers;
  247. }
  248. void JobSystemThreadPool::StopThreads()
  249. {
  250. if (mThreads.empty())
  251. return;
  252. // Signal threads that we want to stop and wake them up
  253. mQuit = true;
  254. mSemaphore.Release((uint)mThreads.size());
  255. // Wait for all threads to finish
  256. for (thread &t : mThreads)
  257. if (t.joinable())
  258. t.join();
  259. // Delete all threads
  260. mThreads.clear();
  261. // Ensure that there are no lingering jobs in the queue
  262. for (uint head = 0; head != mTail; ++head)
  263. {
  264. // Fetch job
  265. Job *job_ptr = mQueue[head & (cQueueLength - 1)].exchange(nullptr);
  266. if (job_ptr != nullptr)
  267. {
  268. // And execute it
  269. job_ptr->Execute();
  270. job_ptr->Release();
  271. }
  272. }
  273. // Destroy heads and reset tail
  274. Free(mHeads);
  275. mHeads = nullptr;
  276. mTail = 0;
  277. }
  278. JobHandle JobSystemThreadPool::CreateJob(const char *inJobName, ColorArg inColor, const JobFunction &inJobFunction, uint32 inNumDependencies)
  279. {
  280. JPH_PROFILE_FUNCTION();
  281. // Loop until we can get a job from the free list
  282. uint32 index;
  283. for (;;)
  284. {
  285. index = mJobs.ConstructObject(inJobName, inColor, this, inJobFunction, inNumDependencies);
  286. if (index != AvailableJobs::cInvalidObjectIndex)
  287. break;
  288. JPH_ASSERT(false, "No jobs available!");
  289. std::this_thread::sleep_for(std::chrono::microseconds(100));
  290. }
  291. Job *job = &mJobs.Get(index);
  292. // Construct handle to keep a reference, the job is queued below and may immediately complete
  293. JobHandle handle(job);
  294. // If there are no dependencies, queue the job now
  295. if (inNumDependencies == 0)
  296. QueueJob(job);
  297. // Return the handle
  298. return handle;
  299. }
  300. void JobSystemThreadPool::FreeJob(Job *inJob)
  301. {
  302. mJobs.DestructObject(inJob);
  303. }
  304. JobSystem::Barrier *JobSystemThreadPool::CreateBarrier()
  305. {
  306. JPH_PROFILE_FUNCTION();
  307. // Find the first unused barrier
  308. for (uint32 index = 0; index < mMaxBarriers; ++index)
  309. {
  310. bool expected = false;
  311. if (mBarriers[index].mInUse.compare_exchange_strong(expected, true))
  312. return &mBarriers[index];
  313. }
  314. return nullptr;
  315. }
  316. void JobSystemThreadPool::DestroyBarrier(Barrier *inBarrier)
  317. {
  318. JPH_PROFILE_FUNCTION();
  319. // Check that no jobs are in the barrier
  320. JPH_ASSERT(static_cast<BarrierImpl *>(inBarrier)->IsEmpty());
  321. // Flag the barrier as unused
  322. bool expected = true;
  323. static_cast<BarrierImpl *>(inBarrier)->mInUse.compare_exchange_strong(expected, false);
  324. JPH_ASSERT(expected);
  325. }
  326. void JobSystemThreadPool::WaitForJobs(Barrier *inBarrier)
  327. {
  328. JPH_PROFILE_FUNCTION();
  329. // Let our barrier implementation wait for the jobs
  330. static_cast<BarrierImpl *>(inBarrier)->Wait();
  331. }
  332. uint JobSystemThreadPool::GetHead() const
  333. {
  334. // Find the minimal value across all threads
  335. uint head = mTail;
  336. for (size_t i = 0; i < mThreads.size(); ++i)
  337. head = min(head, mHeads[i].load());
  338. return head;
  339. }
  340. void JobSystemThreadPool::QueueJobInternal(Job *inJob)
  341. {
  342. // Add reference to job because we're adding the job to the queue
  343. inJob->AddRef();
  344. // Need to read head first because otherwise the tail can already have passed the head
  345. // We read the head outside of the loop since it involves iterating over all threads and we only need to update
  346. // it if there's not enough space in the queue.
  347. uint head = GetHead();
  348. for (;;)
  349. {
  350. // Check if there's space in the queue
  351. uint old_value = mTail;
  352. if (old_value - head >= cQueueLength)
  353. {
  354. // We calculated the head outside of the loop, update head (and we also need to update tail to prevent it from passing head)
  355. head = GetHead();
  356. old_value = mTail;
  357. // Second check if there's space in the queue
  358. if (old_value - head >= cQueueLength)
  359. {
  360. // Wake up all threads in order to ensure that they can clear any nullptrs they may not have processed yet
  361. mSemaphore.Release((uint)mThreads.size());
  362. // Sleep a little (we have to wait for other threads to update their head pointer in order for us to be able to continue)
  363. std::this_thread::sleep_for(std::chrono::microseconds(100));
  364. continue;
  365. }
  366. }
  367. // Write the job pointer if the slot is empty
  368. Job *expected_job = nullptr;
  369. bool success = mQueue[old_value & (cQueueLength - 1)].compare_exchange_strong(expected_job, inJob);
  370. // Regardless of who wrote the slot, we will update the tail (if the successful thread got scheduled out
  371. // after writing the pointer we still want to be able to continue)
  372. mTail.compare_exchange_strong(old_value, old_value + 1);
  373. // If we successfully added our job we're done
  374. if (success)
  375. break;
  376. }
  377. }
  378. void JobSystemThreadPool::QueueJob(Job *inJob)
  379. {
  380. JPH_PROFILE_FUNCTION();
  381. // If we have no worker threads, we can't queue the job either. We assume in this case that the job will be added to a barrier and that the barrier will execute the job when it's Wait() function is called.
  382. if (mThreads.empty())
  383. return;
  384. // Queue the job
  385. QueueJobInternal(inJob);
  386. // Wake up thread
  387. mSemaphore.Release();
  388. }
  389. void JobSystemThreadPool::QueueJobs(Job **inJobs, uint inNumJobs)
  390. {
  391. JPH_PROFILE_FUNCTION();
  392. JPH_ASSERT(inNumJobs > 0);
  393. // If we have no worker threads, we can't queue the job either. We assume in this case that the job will be added to a barrier and that the barrier will execute the job when it's Wait() function is called.
  394. if (mThreads.empty())
  395. return;
  396. // Queue all jobs
  397. for (Job **job = inJobs, **job_end = inJobs + inNumJobs; job < job_end; ++job)
  398. QueueJobInternal(*job);
  399. // Wake up threads
  400. mSemaphore.Release(min(inNumJobs, (uint)mThreads.size()));
  401. }
  402. #if defined(JPH_PLATFORM_WINDOWS) && !defined(JPH_COMPILER_MINGW) // MinGW doesn't support __try/__except
  403. // Sets the current thread name in MSVC debugger
  404. static void SetThreadName(const char *inName)
  405. {
  406. #pragma pack(push, 8)
  407. struct THREADNAME_INFO
  408. {
  409. DWORD dwType; // Must be 0x1000.
  410. LPCSTR szName; // Pointer to name (in user addr space).
  411. DWORD dwThreadID; // Thread ID (-1=caller thread).
  412. DWORD dwFlags; // Reserved for future use, must be zero.
  413. };
  414. #pragma pack(pop)
  415. THREADNAME_INFO info;
  416. info.dwType = 0x1000;
  417. info.szName = inName;
  418. info.dwThreadID = (DWORD)-1;
  419. info.dwFlags = 0;
  420. __try
  421. {
  422. RaiseException(0x406D1388, 0, sizeof(info) / sizeof(ULONG_PTR), (ULONG_PTR *)&info);
  423. }
  424. __except(EXCEPTION_EXECUTE_HANDLER)
  425. {
  426. }
  427. }
  428. #endif // JPH_PLATFORM_WINDOWS && !JPH_COMPILER_MINGW
  429. void JobSystemThreadPool::ThreadMain(int inThreadIndex)
  430. {
  431. // Name the thread
  432. char name[64];
  433. snprintf(name, sizeof(name), "Worker %d", int(inThreadIndex + 1));
  434. #if defined(JPH_PLATFORM_WINDOWS) && !defined(JPH_COMPILER_MINGW)
  435. SetThreadName(name);
  436. #endif // JPH_PLATFORM_WINDOWS && !JPH_COMPILER_MINGW
  437. // Enable floating point exceptions
  438. FPExceptionsEnable enable_exceptions;
  439. JPH_UNUSED(enable_exceptions);
  440. JPH_PROFILE_THREAD_START(name);
  441. atomic<uint> &head = mHeads[inThreadIndex];
  442. while (!mQuit)
  443. {
  444. // Wait for jobs
  445. mSemaphore.Acquire();
  446. {
  447. JPH_PROFILE("Executing Jobs");
  448. // Loop over the queue
  449. while (head != mTail)
  450. {
  451. // Exchange any job pointer we find with a nullptr
  452. atomic<Job *> &job = mQueue[head & (cQueueLength - 1)];
  453. if (job.load() != nullptr)
  454. {
  455. Job *job_ptr = job.exchange(nullptr);
  456. if (job_ptr != nullptr)
  457. {
  458. // And execute it
  459. job_ptr->Execute();
  460. job_ptr->Release();
  461. }
  462. }
  463. head++;
  464. }
  465. }
  466. }
  467. JPH_PROFILE_THREAD_END();
  468. }
  469. JPH_NAMESPACE_END