ThreadPool.cpp 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. // Copyright (C) 2009-2016, Panagiotis Christopoulos Charitos and contributors.
  2. // All rights reserved.
  3. // Code licensed under the BSD License.
  4. // http://www.anki3d.org/LICENSE
  5. #include <anki/util/ThreadPool.h>
  6. #include <anki/util/Logger.h>
  7. #include <cstdlib>
  8. #include <new>
  9. namespace anki
  10. {
  11. //==============================================================================
  12. // ThreadPoolThread =
  13. //==============================================================================
  14. namespace detail
  15. {
  16. /// The thread that executes a ThreadPoolTask
  17. class ThreadPoolThread
  18. {
  19. public:
  20. U32 m_id; ///< An ID
  21. Thread m_thread; ///< Runs the workingFunc
  22. ThreadPoolTask* m_task; ///< Its NULL if there is no pending task
  23. ThreadPool* m_threadpool;
  24. Bool8 m_quit = false;
  25. /// Constructor
  26. ThreadPoolThread(U32 id, ThreadPool* threadpool)
  27. : m_id(id)
  28. , m_thread("anki_threadpool")
  29. , m_task(nullptr)
  30. , m_threadpool(threadpool)
  31. {
  32. ANKI_ASSERT(threadpool);
  33. m_thread.start(this, threadCallback);
  34. }
  35. private:
  36. /// Thread callaback
  37. static Error threadCallback(Thread::Info& info)
  38. {
  39. ThreadPoolThread& self =
  40. *reinterpret_cast<ThreadPoolThread*>(info.m_userData);
  41. Barrier& barrier = self.m_threadpool->m_barrier;
  42. const PtrSize threadCount = self.m_threadpool->getThreadsCount();
  43. Bool quit = false;
  44. while(!quit)
  45. {
  46. // Wait for something
  47. barrier.wait();
  48. quit = self.m_quit;
  49. // Exec
  50. Error err = (*self.m_task)(self.m_id, threadCount);
  51. if(err)
  52. {
  53. self.m_threadpool->m_err = err;
  54. }
  55. // Sync with main thread
  56. barrier.wait();
  57. }
  58. return ErrorCode::NONE;
  59. }
  60. };
  61. } // end namespace detail
  62. //==============================================================================
  63. // ThreadPool =
  64. //==============================================================================
  65. //==============================================================================
  66. ThreadPool::DummyTask ThreadPool::m_dummyTask;
  67. //==============================================================================
  68. ThreadPool::ThreadPool(U32 threadsCount)
  69. : m_barrier(threadsCount + 1)
  70. {
  71. m_threadsCount = threadsCount;
  72. ANKI_ASSERT(m_threadsCount <= MAX_THREADS && m_threadsCount > 0);
  73. m_threads = reinterpret_cast<detail::ThreadPoolThread*>(
  74. malloc(sizeof(detail::ThreadPoolThread) * m_threadsCount));
  75. if(m_threads == nullptr)
  76. {
  77. ANKI_LOGF("Out of memory");
  78. }
  79. while(threadsCount-- != 0)
  80. {
  81. ::new(&m_threads[threadsCount])
  82. detail::ThreadPoolThread(threadsCount, this);
  83. }
  84. }
  85. //==============================================================================
  86. ThreadPool::~ThreadPool()
  87. {
  88. // Terminate threads
  89. U count = m_threadsCount;
  90. while(count-- != 0)
  91. {
  92. detail::ThreadPoolThread& thread = m_threads[count];
  93. thread.m_quit = true;
  94. thread.m_task = &m_dummyTask;
  95. }
  96. // Wakeup the threads
  97. m_barrier.wait();
  98. // Wait the threads
  99. m_barrier.wait();
  100. while(m_threadsCount-- != 0)
  101. {
  102. Error err = m_threads[m_threadsCount].m_thread.join();
  103. (void)err;
  104. m_threads[m_threadsCount].~ThreadPoolThread();
  105. }
  106. if(m_threads)
  107. {
  108. free(m_threads);
  109. }
  110. }
  111. //==============================================================================
  112. void ThreadPool::assignNewTask(U32 slot, ThreadPoolTask* task)
  113. {
  114. ANKI_ASSERT(slot < getThreadsCount());
  115. if(task == nullptr)
  116. {
  117. task = &m_dummyTask;
  118. }
  119. m_threads[slot].m_task = task;
  120. ++m_tasksAssigned;
  121. if(m_tasksAssigned == m_threadsCount)
  122. {
  123. // Last task is assigned. Wake all threads
  124. m_barrier.wait();
  125. }
  126. }
  127. } // end namespace anki