JobSystemThreadPool.cpp 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. // Jolt Physics Library (https://github.com/jrouwe/JoltPhysics)
  2. // SPDX-FileCopyrightText: 2021 Jorrit Rouwe
  3. // SPDX-License-Identifier: MIT
  4. #include <Jolt/Jolt.h>
  5. #include <Jolt/Core/JobSystemThreadPool.h>
  6. #include <Jolt/Core/Profiler.h>
  7. #include <Jolt/Core/FPException.h>
  8. #include <Jolt/Core/IncludeWindows.h>
  9. #ifdef JPH_PLATFORM_LINUX
  10. #include <sys/prctl.h>
  11. #endif
  12. JPH_NAMESPACE_BEGIN
  13. void JobSystemThreadPool::Init(uint inMaxJobs, uint inMaxBarriers, int inNumThreads)
  14. {
  15. JobSystemWithBarrier::Init(inMaxBarriers);
  16. // Init freelist of jobs
  17. mJobs.Init(inMaxJobs, inMaxJobs);
  18. // Init queue
  19. for (atomic<Job *> &j : mQueue)
  20. j = nullptr;
  21. // Start the worker threads
  22. StartThreads(inNumThreads);
  23. }
  24. JobSystemThreadPool::JobSystemThreadPool(uint inMaxJobs, uint inMaxBarriers, int inNumThreads)
  25. {
  26. Init(inMaxJobs, inMaxBarriers, inNumThreads);
  27. }
  28. void JobSystemThreadPool::StartThreads([[maybe_unused]] int inNumThreads)
  29. {
  30. #if !defined(JPH_CPU_WASM) || defined(__EMSCRIPTEN_PTHREADS__) // If we're running without threads support we cannot create threads and we ignore the inNumThreads parameter
  31. // Auto detect number of threads
  32. if (inNumThreads < 0)
  33. inNumThreads = thread::hardware_concurrency() - 1;
  34. // If no threads are requested we're done
  35. if (inNumThreads == 0)
  36. return;
  37. // Don't quit the threads
  38. mQuit = false;
  39. // Allocate heads
  40. mHeads = reinterpret_cast<atomic<uint> *>(Allocate(sizeof(atomic<uint>) * inNumThreads));
  41. for (int i = 0; i < inNumThreads; ++i)
  42. mHeads[i] = 0;
  43. // Start running threads
  44. JPH_ASSERT(mThreads.empty());
  45. mThreads.reserve(inNumThreads);
  46. for (int i = 0; i < inNumThreads; ++i)
  47. mThreads.emplace_back([this, i] { ThreadMain(i); });
  48. #endif
  49. }
  50. JobSystemThreadPool::~JobSystemThreadPool()
  51. {
  52. // Stop all worker threads
  53. StopThreads();
  54. }
  55. void JobSystemThreadPool::StopThreads()
  56. {
  57. if (mThreads.empty())
  58. return;
  59. // Signal threads that we want to stop and wake them up
  60. mQuit = true;
  61. mSemaphore.Release((uint)mThreads.size());
  62. // Wait for all threads to finish
  63. for (thread &t : mThreads)
  64. if (t.joinable())
  65. t.join();
  66. // Delete all threads
  67. mThreads.clear();
  68. // Ensure that there are no lingering jobs in the queue
  69. for (uint head = 0; head != mTail; ++head)
  70. {
  71. // Fetch job
  72. Job *job_ptr = mQueue[head & (cQueueLength - 1)].exchange(nullptr);
  73. if (job_ptr != nullptr)
  74. {
  75. // And execute it
  76. job_ptr->Execute();
  77. job_ptr->Release();
  78. }
  79. }
  80. // Destroy heads and reset tail
  81. Free(mHeads);
  82. mHeads = nullptr;
  83. mTail = 0;
  84. }
  85. JobHandle JobSystemThreadPool::CreateJob(const char *inJobName, ColorArg inColor, const JobFunction &inJobFunction, uint32 inNumDependencies)
  86. {
  87. JPH_PROFILE_FUNCTION();
  88. // Loop until we can get a job from the free list
  89. uint32 index;
  90. for (;;)
  91. {
  92. index = mJobs.ConstructObject(inJobName, inColor, this, inJobFunction, inNumDependencies);
  93. if (index != AvailableJobs::cInvalidObjectIndex)
  94. break;
  95. JPH_ASSERT(false, "No jobs available!");
  96. std::this_thread::sleep_for(std::chrono::microseconds(100));
  97. }
  98. Job *job = &mJobs.Get(index);
  99. // Construct handle to keep a reference, the job is queued below and may immediately complete
  100. JobHandle handle(job);
  101. // If there are no dependencies, queue the job now
  102. if (inNumDependencies == 0)
  103. QueueJob(job);
  104. // Return the handle
  105. return handle;
  106. }
  107. void JobSystemThreadPool::FreeJob(Job *inJob)
  108. {
  109. mJobs.DestructObject(inJob);
  110. }
  111. uint JobSystemThreadPool::GetHead() const
  112. {
  113. // Find the minimal value across all threads
  114. uint head = mTail;
  115. for (size_t i = 0; i < mThreads.size(); ++i)
  116. head = min(head, mHeads[i].load());
  117. return head;
  118. }
  119. void JobSystemThreadPool::QueueJobInternal(Job *inJob)
  120. {
  121. // Add reference to job because we're adding the job to the queue
  122. inJob->AddRef();
  123. // Need to read head first because otherwise the tail can already have passed the head
  124. // We read the head outside of the loop since it involves iterating over all threads and we only need to update
  125. // it if there's not enough space in the queue.
  126. uint head = GetHead();
  127. for (;;)
  128. {
  129. // Check if there's space in the queue
  130. uint old_value = mTail;
  131. if (old_value - head >= cQueueLength)
  132. {
  133. // We calculated the head outside of the loop, update head (and we also need to update tail to prevent it from passing head)
  134. head = GetHead();
  135. old_value = mTail;
  136. // Second check if there's space in the queue
  137. if (old_value - head >= cQueueLength)
  138. {
  139. // Wake up all threads in order to ensure that they can clear any nullptrs they may not have processed yet
  140. mSemaphore.Release((uint)mThreads.size());
  141. // Sleep a little (we have to wait for other threads to update their head pointer in order for us to be able to continue)
  142. std::this_thread::sleep_for(std::chrono::microseconds(100));
  143. continue;
  144. }
  145. }
  146. // Write the job pointer if the slot is empty
  147. Job *expected_job = nullptr;
  148. bool success = mQueue[old_value & (cQueueLength - 1)].compare_exchange_strong(expected_job, inJob);
  149. // Regardless of who wrote the slot, we will update the tail (if the successful thread got scheduled out
  150. // after writing the pointer we still want to be able to continue)
  151. mTail.compare_exchange_strong(old_value, old_value + 1);
  152. // If we successfully added our job we're done
  153. if (success)
  154. break;
  155. }
  156. }
  157. void JobSystemThreadPool::QueueJob(Job *inJob)
  158. {
  159. JPH_PROFILE_FUNCTION();
  160. // If we have no worker threads, we can't queue the job either. We assume in this case that the job will be added to a barrier and that the barrier will execute the job when it's Wait() function is called.
  161. if (mThreads.empty())
  162. return;
  163. // Queue the job
  164. QueueJobInternal(inJob);
  165. // Wake up thread
  166. mSemaphore.Release();
  167. }
  168. void JobSystemThreadPool::QueueJobs(Job **inJobs, uint inNumJobs)
  169. {
  170. JPH_PROFILE_FUNCTION();
  171. JPH_ASSERT(inNumJobs > 0);
  172. // If we have no worker threads, we can't queue the job either. We assume in this case that the job will be added to a barrier and that the barrier will execute the job when it's Wait() function is called.
  173. if (mThreads.empty())
  174. return;
  175. // Queue all jobs
  176. for (Job **job = inJobs, **job_end = inJobs + inNumJobs; job < job_end; ++job)
  177. QueueJobInternal(*job);
  178. // Wake up threads
  179. mSemaphore.Release(min(inNumJobs, (uint)mThreads.size()));
  180. }
  181. #if defined(JPH_PLATFORM_WINDOWS)
  182. #if !defined(JPH_COMPILER_MINGW) // MinGW doesn't support __try/__except)
  183. // Sets the current thread name in MSVC debugger
  184. static void RaiseThreadNameException(const char *inName)
  185. {
  186. #pragma pack(push, 8)
  187. struct THREADNAME_INFO
  188. {
  189. DWORD dwType; // Must be 0x1000.
  190. LPCSTR szName; // Pointer to name (in user addr space).
  191. DWORD dwThreadID; // Thread ID (-1=caller thread).
  192. DWORD dwFlags; // Reserved for future use, must be zero.
  193. };
  194. #pragma pack(pop)
  195. THREADNAME_INFO info;
  196. info.dwType = 0x1000;
  197. info.szName = inName;
  198. info.dwThreadID = (DWORD)-1;
  199. info.dwFlags = 0;
  200. __try
  201. {
  202. RaiseException(0x406D1388, 0, sizeof(info) / sizeof(ULONG_PTR), (ULONG_PTR *)&info);
  203. }
  204. __except(EXCEPTION_EXECUTE_HANDLER)
  205. {
  206. }
  207. }
  208. #endif // !JPH_COMPILER_MINGW
  209. static void SetThreadName(const char* inName)
  210. {
  211. JPH_SUPPRESS_WARNING_PUSH
  212. // Suppress casting warning, it's fine here as GetProcAddress doesn't really return a FARPROC
  213. JPH_CLANG_SUPPRESS_WARNING("-Wcast-function-type") // error : cast from 'FARPROC' (aka 'long long (*)()') to 'SetThreadDescriptionFunc' (aka 'long (*)(void *, const wchar_t *)') converts to incompatible function type
  214. JPH_CLANG_SUPPRESS_WARNING("-Wcast-function-type-strict") // error : cast from 'FARPROC' (aka 'long long (*)()') to 'SetThreadDescriptionFunc' (aka 'long (*)(void *, const wchar_t *)') converts to incompatible function type
  215. JPH_MSVC_SUPPRESS_WARNING(4191) // reinterpret_cast' : unsafe conversion from 'FARPROC' to 'SetThreadDescriptionFunc'. Calling this function through the result pointer may cause your program to fail
  216. using SetThreadDescriptionFunc = HRESULT(WINAPI*)(HANDLE hThread, PCWSTR lpThreadDescription);
  217. static SetThreadDescriptionFunc SetThreadDescription = reinterpret_cast<SetThreadDescriptionFunc>(GetProcAddress(GetModuleHandleW(L"Kernel32.dll"), "SetThreadDescription"));
  218. JPH_SUPPRESS_WARNING_POP
  219. if (SetThreadDescription)
  220. {
  221. wchar_t name_buffer[64] = { 0 };
  222. if (MultiByteToWideChar(CP_UTF8, 0, inName, -1, name_buffer, sizeof(name_buffer) / sizeof(wchar_t) - 1) == 0)
  223. return;
  224. SetThreadDescription(GetCurrentThread(), name_buffer);
  225. }
  226. #if !defined(JPH_COMPILER_MINGW)
  227. else if (IsDebuggerPresent())
  228. RaiseThreadNameException(inName);
  229. #endif // !JPH_COMPILER_MINGW
  230. }
  231. #elif defined(JPH_PLATFORM_LINUX)
  232. static void SetThreadName(const char *inName)
  233. {
  234. JPH_ASSERT(strlen(inName) < 16); // String will be truncated if it is longer
  235. prctl(PR_SET_NAME, inName, 0, 0, 0);
  236. }
  237. #endif // JPH_PLATFORM_LINUX
  238. void JobSystemThreadPool::ThreadMain(int inThreadIndex)
  239. {
  240. // Name the thread
  241. char name[64];
  242. snprintf(name, sizeof(name), "Worker %d", int(inThreadIndex + 1));
  243. #if defined(JPH_PLATFORM_WINDOWS) || defined(JPH_PLATFORM_LINUX)
  244. SetThreadName(name);
  245. #endif // JPH_PLATFORM_WINDOWS && !JPH_COMPILER_MINGW
  246. // Enable floating point exceptions
  247. FPExceptionsEnable enable_exceptions;
  248. JPH_UNUSED(enable_exceptions);
  249. JPH_PROFILE_THREAD_START(name);
  250. // Call the thread init function
  251. mThreadInitFunction(inThreadIndex);
  252. atomic<uint> &head = mHeads[inThreadIndex];
  253. while (!mQuit)
  254. {
  255. // Wait for jobs
  256. mSemaphore.Acquire();
  257. {
  258. JPH_PROFILE("Executing Jobs");
  259. // Loop over the queue
  260. while (head != mTail)
  261. {
  262. // Exchange any job pointer we find with a nullptr
  263. atomic<Job *> &job = mQueue[head & (cQueueLength - 1)];
  264. if (job.load() != nullptr)
  265. {
  266. Job *job_ptr = job.exchange(nullptr);
  267. if (job_ptr != nullptr)
  268. {
  269. // And execute it
  270. job_ptr->Execute();
  271. job_ptr->Release();
  272. }
  273. }
  274. head++;
  275. }
  276. }
  277. }
  278. // Call the thread exit function
  279. mThreadExitFunction(inThreadIndex);
  280. JPH_PROFILE_THREAD_END();
  281. }
  282. JPH_NAMESPACE_END