ThreadPool.cpp 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. // Copyright (C) 2009-2021, Panagiotis Christopoulos Charitos and contributors.
  2. // All rights reserved.
  3. // Code licensed under the BSD License.
  4. // http://www.anki3d.org/LICENSE
  5. #include <AnKi/Util/ThreadPool.h>
  6. #include <AnKi/Util/Logger.h>
  7. #include <cstdlib>
  8. #include <new>
  9. namespace anki
  10. {
  11. namespace detail
  12. {
  13. /// The thread that executes a ThreadPoolTask
  14. class ThreadPoolThread
  15. {
  16. public:
  17. U32 m_id; ///< An ID
  18. Thread m_thread; ///< Runs the workingFunc
  19. ThreadPoolTask* m_task; ///< Its NULL if there is no pending task
  20. ThreadPool* m_threadpool;
  21. Bool m_quit = false;
  22. /// Constructor
  23. ThreadPoolThread(U32 id, ThreadPool* threadpool, Bool pinToCore)
  24. : m_id(id)
  25. , m_thread("anki_threadpool")
  26. , m_task(nullptr)
  27. , m_threadpool(threadpool)
  28. {
  29. ANKI_ASSERT(threadpool);
  30. m_thread.start(this, threadCallback, ThreadCoreAffinityMask(false).set(m_id, pinToCore));
  31. }
  32. private:
  33. /// Thread callaback
  34. static Error threadCallback(ThreadCallbackInfo& info)
  35. {
  36. ThreadPoolThread& self = *static_cast<ThreadPoolThread*>(info.m_userData);
  37. Barrier& barrier = self.m_threadpool->m_barrier;
  38. const PtrSize threadCount = self.m_threadpool->getThreadCount();
  39. Bool quit = false;
  40. while(!quit)
  41. {
  42. // Wait for something
  43. barrier.wait();
  44. quit = self.m_quit;
  45. // Exec
  46. Error err = (*self.m_task)(self.m_id, threadCount);
  47. if(err)
  48. {
  49. self.m_threadpool->m_err = err;
  50. }
  51. // Sync with main thread
  52. barrier.wait();
  53. }
  54. return Error::NONE;
  55. }
  56. };
  57. } // end namespace detail
  58. ThreadPool::DummyTask ThreadPool::m_dummyTask;
  59. ThreadPool::ThreadPool(U32 threadCount, Bool pinToCores)
  60. : m_barrier(threadCount + 1)
  61. {
  62. m_threadsCount = threadCount;
  63. ANKI_ASSERT(m_threadsCount <= MAX_THREADS && m_threadsCount > 0);
  64. m_threads = static_cast<detail::ThreadPoolThread*>(malloc(sizeof(detail::ThreadPoolThread) * m_threadsCount));
  65. if(m_threads == nullptr)
  66. {
  67. ANKI_UTIL_LOGF("Out of memory");
  68. }
  69. while(threadCount-- != 0)
  70. {
  71. ::new(&m_threads[threadCount]) detail::ThreadPoolThread(threadCount, this, pinToCores);
  72. }
  73. }
  74. ThreadPool::~ThreadPool()
  75. {
  76. // Terminate threads
  77. U count = m_threadsCount;
  78. while(count-- != 0)
  79. {
  80. detail::ThreadPoolThread& thread = m_threads[count];
  81. thread.m_quit = true;
  82. thread.m_task = &m_dummyTask;
  83. }
  84. // Wakeup the threads
  85. m_barrier.wait();
  86. // Wait the threads
  87. m_barrier.wait();
  88. while(m_threadsCount-- != 0)
  89. {
  90. Error err = m_threads[m_threadsCount].m_thread.join();
  91. (void)err;
  92. m_threads[m_threadsCount].~ThreadPoolThread();
  93. }
  94. if(m_threads)
  95. {
  96. free(m_threads);
  97. }
  98. }
  99. void ThreadPool::assignNewTask(U32 slot, ThreadPoolTask* task)
  100. {
  101. ANKI_ASSERT(slot < getThreadCount());
  102. if(task == nullptr)
  103. {
  104. task = &m_dummyTask;
  105. }
  106. m_threads[slot].m_task = task;
  107. ++m_tasksAssigned;
  108. if(m_tasksAssigned == m_threadsCount)
  109. {
  110. // Last task is assigned. Wake all threads
  111. m_barrier.wait();
  112. }
  113. }
  114. } // end namespace anki