JobSystemThreadPool.h 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. // Jolt Physics Library (https://github.com/jrouwe/JoltPhysics)
  2. // SPDX-FileCopyrightText: 2021 Jorrit Rouwe
  3. // SPDX-License-Identifier: MIT
  4. #pragma once
  5. #include <Jolt/Core/JobSystemWithBarrier.h>
  6. #include <Jolt/Core/FixedSizeFreeList.h>
  7. #include <Jolt/Core/Semaphore.h>
  8. JPH_SUPPRESS_WARNINGS_STD_BEGIN
  9. #include <thread>
  10. JPH_SUPPRESS_WARNINGS_STD_END
  11. JPH_NAMESPACE_BEGIN
  12. // Things we're using from STL
  13. using std::thread;
  14. /// Implementation of a JobSystem using a thread pool
  15. ///
  16. /// Note that this is considered an example implementation. It is expected that when you integrate
  17. /// the physics engine into your own project that you'll provide your own implementation of the
  18. /// JobSystem built on top of whatever job system your project uses.
  19. class JPH_EXPORT JobSystemThreadPool final : public JobSystemWithBarrier
  20. {
  21. public:
  22. JPH_OVERRIDE_NEW_DELETE
  23. /// Creates a thread pool.
  24. /// @see JobSystemThreadPool::Init
  25. JobSystemThreadPool(uint inMaxJobs, uint inMaxBarriers, int inNumThreads = -1);
  26. JobSystemThreadPool() = default;
  27. virtual ~JobSystemThreadPool() override;
  28. /// Functions to call when a thread is initialized or exits, must be set before calling Init()
  29. using InitExitFunction = function<void(int)>;
  30. void SetThreadInitFunction(const InitExitFunction &inInitFunction) { mThreadInitFunction = inInitFunction; }
  31. void SetThreadExitFunction(const InitExitFunction &inExitFunction) { mThreadExitFunction = inExitFunction; }
  32. /// Initialize the thread pool
  33. /// @param inMaxJobs Max number of jobs that can be allocated at any time
  34. /// @param inMaxBarriers Max number of barriers that can be allocated at any time
  35. /// @param inNumThreads Number of threads to start (the number of concurrent jobs is 1 more because the main thread will also run jobs while waiting for a barrier to complete). Use -1 to auto detect the amount of CPU's.
  36. void Init(uint inMaxJobs, uint inMaxBarriers, int inNumThreads = -1);
  37. // See JobSystem
  38. virtual int GetMaxConcurrency() const override { return int(mThreads.size()) + 1; }
  39. virtual JobHandle CreateJob(const char *inName, ColorArg inColor, const JobFunction &inJobFunction, uint32 inNumDependencies = 0) override;
  40. /// Change the max concurrency after initialization
  41. void SetNumThreads(int inNumThreads) { StopThreads(); StartThreads(inNumThreads); }
  42. protected:
  43. // See JobSystem
  44. virtual void QueueJob(Job *inJob) override;
  45. virtual void QueueJobs(Job **inJobs, uint inNumJobs) override;
  46. virtual void FreeJob(Job *inJob) override;
  47. private:
  48. /// Start/stop the worker threads
  49. void StartThreads(int inNumThreads);
  50. void StopThreads();
  51. /// Entry point for a thread
  52. void ThreadMain(int inThreadIndex);
  53. /// Get the head of the thread that has processed the least amount of jobs
  54. inline uint GetHead() const;
  55. /// Internal helper function to queue a job
  56. inline void QueueJobInternal(Job *inJob);
  57. /// Functions to call when initializing or exiting a thread
  58. InitExitFunction mThreadInitFunction = [](int) { };
  59. InitExitFunction mThreadExitFunction = [](int) { };
  60. /// Array of jobs (fixed size)
  61. using AvailableJobs = FixedSizeFreeList<Job>;
  62. AvailableJobs mJobs;
  63. /// Threads running jobs
  64. Array<thread> mThreads;
  65. // The job queue
  66. static constexpr uint32 cQueueLength = 1024;
  67. static_assert(IsPowerOf2(cQueueLength)); // We do bit operations and require queue length to be a power of 2
  68. atomic<Job *> mQueue[cQueueLength];
  69. // Head and tail of the queue, do this value modulo cQueueLength - 1 to get the element in the mQueue array
  70. atomic<uint> * mHeads = nullptr; ///< Per executing thread the head of the current queue
  71. alignas(JPH_CACHE_LINE_SIZE) atomic<uint> mTail = 0; ///< Tail (write end) of the queue
  72. // Semaphore used to signal worker threads that there is new work
  73. Semaphore mSemaphore;
  74. /// Boolean to indicate that we want to stop the job system
  75. atomic<bool> mQuit = false;
  76. };
  77. JPH_NAMESPACE_END