ThreadPool.cpp 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. // Copyright (C) 2009-present, Panagiotis Christopoulos Charitos and contributors.
  2. // All rights reserved.
  3. // Code licensed under the BSD License.
  4. // http://www.anki3d.org/LICENSE
  5. #include <AnKi/Util/ThreadPool.h>
  6. #include <AnKi/Util/Logger.h>
  7. #include <AnKi/Util/String.h>
  8. #include <cstdlib>
  9. #include <new>
  10. namespace anki {
  11. namespace detail {
  12. /// The thread that executes a ThreadPoolTask
  13. class ThreadPoolThread
  14. {
  15. public:
  16. U32 m_id; ///< An ID
  17. Thread m_thread; ///< Runs the workingFunc
  18. ThreadPoolTask* m_task; ///< Its NULL if there is no pending task
  19. ThreadPool* m_threadpool;
  20. Bool m_quit = false;
  21. /// Constructor
  22. ThreadPoolThread(U32 id, ThreadPool* threadpool, Bool pinToCore, CString threadName)
  23. : m_id(id)
  24. , m_thread(threadName.cstr())
  25. , m_task(nullptr)
  26. , m_threadpool(threadpool)
  27. {
  28. ANKI_ASSERT(threadpool);
  29. m_thread.start(this, threadCallback, ThreadCoreAffinityMask(false).set(m_id, pinToCore));
  30. }
  31. private:
  32. /// Thread callaback
  33. static Error threadCallback(ThreadCallbackInfo& info)
  34. {
  35. ThreadPoolThread& self = *static_cast<ThreadPoolThread*>(info.m_userData);
  36. Barrier& barrier = self.m_threadpool->m_barrier;
  37. const PtrSize threadCount = self.m_threadpool->getThreadCount();
  38. Bool quit = false;
  39. while(!quit)
  40. {
  41. // Wait for something
  42. barrier.wait();
  43. quit = self.m_quit;
  44. // Exec
  45. Error err = (*self.m_task)(self.m_id, threadCount);
  46. if(err)
  47. {
  48. self.m_threadpool->m_err = err;
  49. }
  50. // Sync with main thread
  51. barrier.wait();
  52. }
  53. return Error::kNone;
  54. }
  55. };
  56. } // end namespace detail
  57. ThreadPool::DummyTask ThreadPool::m_dummyTask;
  58. ThreadPool::ThreadPool(U32 threadCount, Bool pinToCores)
  59. : m_barrier(threadCount + 1)
  60. {
  61. m_threadsCount = threadCount;
  62. ANKI_ASSERT(m_threadsCount <= kMaxThreads && m_threadsCount > 0);
  63. m_threads = static_cast<detail::ThreadPoolThread*>(malloc(sizeof(detail::ThreadPoolThread) * m_threadsCount));
  64. if(m_threads == nullptr)
  65. {
  66. ANKI_UTIL_LOGF("Out of memory");
  67. }
  68. for(U32 i = 0; i < threadCount; ++i)
  69. {
  70. Array<Char, 64> threadName;
  71. snprintf(&threadName[0], threadName.getSize(), "ThreadPool#%u", i);
  72. ::new(&m_threads[i]) detail::ThreadPoolThread(i, this, pinToCores, &threadName[0]);
  73. }
  74. }
  75. ThreadPool::~ThreadPool()
  76. {
  77. // Terminate threads
  78. U count = m_threadsCount;
  79. while(count-- != 0)
  80. {
  81. detail::ThreadPoolThread& thread = m_threads[count];
  82. thread.m_quit = true;
  83. thread.m_task = &m_dummyTask;
  84. }
  85. // Wakeup the threads
  86. m_barrier.wait();
  87. // Wait the threads
  88. m_barrier.wait();
  89. while(m_threadsCount-- != 0)
  90. {
  91. [[maybe_unused]] const Error err = m_threads[m_threadsCount].m_thread.join();
  92. m_threads[m_threadsCount].~ThreadPoolThread();
  93. }
  94. if(m_threads)
  95. {
  96. free(m_threads);
  97. }
  98. }
  99. void ThreadPool::assignNewTask(U32 slot, ThreadPoolTask* task)
  100. {
  101. ANKI_ASSERT(slot < getThreadCount());
  102. if(task == nullptr)
  103. {
  104. task = &m_dummyTask;
  105. }
  106. m_threads[slot].m_task = task;
  107. ++m_tasksAssigned;
  108. if(m_tasksAssigned == m_threadsCount)
  109. {
  110. // Last task is assigned. Wake all threads
  111. m_barrier.wait();
  112. }
  113. }
  114. } // end namespace anki