ThreadPool.cpp 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. // Copyright (C) 2009-2021, Panagiotis Christopoulos Charitos and contributors.
  2. // All rights reserved.
  3. // Code licensed under the BSD License.
  4. // http://www.anki3d.org/LICENSE
  5. #include <AnKi/Util/ThreadPool.h>
  6. #include <AnKi/Util/Logger.h>
  7. #include <cstdlib>
  8. #include <new>
  9. namespace anki {
  10. namespace detail {
  11. /// The thread that executes a ThreadPoolTask
  12. class ThreadPoolThread
  13. {
  14. public:
  15. U32 m_id; ///< An ID
  16. Thread m_thread; ///< Runs the workingFunc
  17. ThreadPoolTask* m_task; ///< Its NULL if there is no pending task
  18. ThreadPool* m_threadpool;
  19. Bool m_quit = false;
  20. /// Constructor
  21. ThreadPoolThread(U32 id, ThreadPool* threadpool, Bool pinToCore)
  22. : m_id(id)
  23. , m_thread("anki_threadpool")
  24. , m_task(nullptr)
  25. , m_threadpool(threadpool)
  26. {
  27. ANKI_ASSERT(threadpool);
  28. m_thread.start(this, threadCallback, ThreadCoreAffinityMask(false).set(m_id, pinToCore));
  29. }
  30. private:
  31. /// Thread callaback
  32. static Error threadCallback(ThreadCallbackInfo& info)
  33. {
  34. ThreadPoolThread& self = *static_cast<ThreadPoolThread*>(info.m_userData);
  35. Barrier& barrier = self.m_threadpool->m_barrier;
  36. const PtrSize threadCount = self.m_threadpool->getThreadCount();
  37. Bool quit = false;
  38. while(!quit)
  39. {
  40. // Wait for something
  41. barrier.wait();
  42. quit = self.m_quit;
  43. // Exec
  44. Error err = (*self.m_task)(self.m_id, threadCount);
  45. if(err)
  46. {
  47. self.m_threadpool->m_err = err;
  48. }
  49. // Sync with main thread
  50. barrier.wait();
  51. }
  52. return Error::NONE;
  53. }
  54. };
  55. } // end namespace detail
  56. ThreadPool::DummyTask ThreadPool::m_dummyTask;
  57. ThreadPool::ThreadPool(U32 threadCount, Bool pinToCores)
  58. : m_barrier(threadCount + 1)
  59. {
  60. m_threadsCount = threadCount;
  61. ANKI_ASSERT(m_threadsCount <= MAX_THREADS && m_threadsCount > 0);
  62. m_threads = static_cast<detail::ThreadPoolThread*>(malloc(sizeof(detail::ThreadPoolThread) * m_threadsCount));
  63. if(m_threads == nullptr)
  64. {
  65. ANKI_UTIL_LOGF("Out of memory");
  66. }
  67. while(threadCount-- != 0)
  68. {
  69. ::new(&m_threads[threadCount]) detail::ThreadPoolThread(threadCount, this, pinToCores);
  70. }
  71. }
  72. ThreadPool::~ThreadPool()
  73. {
  74. // Terminate threads
  75. U count = m_threadsCount;
  76. while(count-- != 0)
  77. {
  78. detail::ThreadPoolThread& thread = m_threads[count];
  79. thread.m_quit = true;
  80. thread.m_task = &m_dummyTask;
  81. }
  82. // Wakeup the threads
  83. m_barrier.wait();
  84. // Wait the threads
  85. m_barrier.wait();
  86. while(m_threadsCount-- != 0)
  87. {
  88. Error err = m_threads[m_threadsCount].m_thread.join();
  89. (void)err;
  90. m_threads[m_threadsCount].~ThreadPoolThread();
  91. }
  92. if(m_threads)
  93. {
  94. free(m_threads);
  95. }
  96. }
  97. void ThreadPool::assignNewTask(U32 slot, ThreadPoolTask* task)
  98. {
  99. ANKI_ASSERT(slot < getThreadCount());
  100. if(task == nullptr)
  101. {
  102. task = &m_dummyTask;
  103. }
  104. m_threads[slot].m_task = task;
  105. ++m_tasksAssigned;
  106. if(m_tasksAssigned == m_threadsCount)
  107. {
  108. // Last task is assigned. Wake all threads
  109. m_barrier.wait();
  110. }
  111. }
  112. } // end namespace anki