ThreadJobManager.cpp 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. // Copyright (C) 2009-present, Panagiotis Christopoulos Charitos and contributors.
  2. // All rights reserved.
  3. // Code licensed under the BSD License.
  4. // http://www.anki3d.org/LICENSE
  5. #include <AnKi/Util/ThreadJobManager.h>
  6. #include <AnKi/Util/String.h>
  7. namespace anki {
  8. class ThreadJobManager::WorkerThread
  9. {
  10. public:
  11. U32 m_id;
  12. Thread m_thread;
  13. ThreadJobManager* m_manager;
  14. WorkerThread(ThreadJobManager* manager, U32 id, Bool pinToCore, CString threadName)
  15. : m_id(id)
  16. , m_thread(threadName.cstr())
  17. , m_manager(manager)
  18. {
  19. m_thread.start(this, threadCallback, ThreadCoreAffinityMask(false).set(m_id, pinToCore));
  20. }
  21. static Error threadCallback(ThreadCallbackInfo& info)
  22. {
  23. WorkerThread& self = *static_cast<WorkerThread*>(info.m_userData);
  24. self.m_manager->threadRun(self.m_id);
  25. return Error::kNone;
  26. }
  27. };
  28. ThreadJobManager::ThreadJobManager(U32 threadCount, Bool pinToCores, U32 queueSize)
  29. {
  30. ANKI_ASSERT(threadCount);
  31. m_threads.resize(threadCount);
  32. for(U32 i = 0; i < threadCount; ++i)
  33. {
  34. String threadName;
  35. threadName.sprintf("JobManager#%u", i);
  36. m_threads[i] = newInstance<WorkerThread>(DefaultMemoryPool::getSingleton(), this, i, pinToCores, threadName);
  37. }
  38. m_tasks.resize(queueSize);
  39. }
  40. ThreadJobManager::~ThreadJobManager()
  41. {
  42. {
  43. LockGuard lock(m_mtx);
  44. m_quit = true;
  45. }
  46. m_cvar.notifyAll();
  47. for(WorkerThread* thread : m_threads)
  48. {
  49. [[maybe_unused]] const Error err = thread->m_thread.join();
  50. deleteInstance(DefaultMemoryPool::getSingleton(), thread);
  51. }
  52. }
  53. Bool ThreadJobManager::pushBackTask(const Func& func)
  54. {
  55. LockGuard lock(m_tasksMtx);
  56. const U32 next = (m_tasksBack + 1) % m_tasks.getSize();
  57. if(next != m_tasksFront)
  58. {
  59. m_tasks[m_tasksBack] = func;
  60. m_tasksBack = next;
  61. return true;
  62. }
  63. return false;
  64. }
  65. Bool ThreadJobManager::popFrontTask(Func& func)
  66. {
  67. LockGuard lock(m_tasksMtx);
  68. if(m_tasksBack != m_tasksFront)
  69. {
  70. func = m_tasks[m_tasksFront];
  71. m_tasksFront = (m_tasksFront + 1) % m_tasks.getSize();
  72. return true;
  73. }
  74. return false;
  75. }
  76. void ThreadJobManager::threadRun(U32 threadId)
  77. {
  78. while(true)
  79. {
  80. Func func;
  81. if(popFrontTask(func))
  82. {
  83. func(threadId);
  84. [[maybe_unused]] const U32 count = m_tasksInFlightCount.fetchSub(1);
  85. ANKI_ASSERT(count > 0);
  86. }
  87. else
  88. {
  89. LockGuard lock(m_mtx);
  90. if(m_quit)
  91. {
  92. break;
  93. }
  94. m_cvar.wait(m_mtx);
  95. }
  96. }
  97. }
  98. } // end namespace anki