BsThreadPool.cpp 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. #include "BsThreadPool.h"
  2. namespace BansheeEngine
  3. {
  4. PooledThread::PooledThread(const String& name)
  5. :mName(name), mIdle(true), mThreadStarted(false),
  6. mThreadReady(false), mIdleTime(0)
  7. { }
  8. PooledThread::~PooledThread()
  9. { }
  10. void PooledThread::initialize()
  11. {
  12. BS_THREAD_CREATE(t, std::bind(&PooledThread::run, this));
  13. mThread = t;
  14. BS_LOCK_MUTEX_NAMED(mMutex, lock);
  15. while(!mThreadStarted)
  16. BS_THREAD_WAIT(mStartedCond, mMutex, lock);
  17. }
  18. void PooledThread::start(std::function<void()> workerMethod)
  19. {
  20. {
  21. BS_LOCK_MUTEX(mMutex);
  22. mWorkerMethod = workerMethod;
  23. mIdle = false;
  24. mIdleTime = std::time(nullptr);
  25. mThreadReady = true;
  26. }
  27. BS_THREAD_NOTIFY_ONE(mReadyCond);
  28. }
  29. void PooledThread::run()
  30. {
  31. onThreadStarted(mName);
  32. {
  33. BS_LOCK_MUTEX(mMutex);
  34. mThreadStarted = true;
  35. }
  36. BS_THREAD_NOTIFY_ONE(mStartedCond);
  37. while(true)
  38. {
  39. std::function<void()> worker = nullptr;
  40. {
  41. BS_LOCK_MUTEX_NAMED(mMutex, lock);
  42. while(!mThreadReady)
  43. BS_THREAD_WAIT(mReadyCond, mMutex, lock);
  44. if(mWorkerMethod == nullptr)
  45. {
  46. onThreadEnded(mName);
  47. return;
  48. }
  49. worker = mWorkerMethod;
  50. }
  51. worker();
  52. {
  53. BS_LOCK_MUTEX(mMutex);
  54. mIdle = true;
  55. mIdleTime = std::time(nullptr);
  56. mThreadReady = false;
  57. }
  58. }
  59. }
  60. void PooledThread::destroy()
  61. {
  62. {
  63. BS_LOCK_MUTEX(mMutex);
  64. mWorkerMethod = nullptr;
  65. mThreadReady = true;
  66. }
  67. BS_THREAD_NOTIFY_ONE(mReadyCond);
  68. BS_THREAD_JOIN((*mThread));
  69. BS_THREAD_DESTROY(mThread);
  70. }
  71. bool PooledThread::isIdle()
  72. {
  73. BS_LOCK_MUTEX(mMutex);
  74. return mIdle;
  75. }
  76. time_t PooledThread::idleTime()
  77. {
  78. BS_LOCK_MUTEX(mMutex);
  79. return (time(nullptr) - mIdleTime);
  80. }
  81. void PooledThread::setName(const String& name)
  82. {
  83. mName = name;
  84. }
  85. ThreadPool::ThreadPool(UINT32 threadCapacity, UINT32 maxCapacity, UINT32 idleTimeout)
  86. :mDefaultCapacity(threadCapacity), mMaxCapacity(maxCapacity), mIdleTimeout(idleTimeout), mAge(0)
  87. {
  88. }
  89. ThreadPool::~ThreadPool()
  90. {
  91. stopAll();
  92. }
  93. void ThreadPool::run(const String& name, std::function<void()> workerMethod)
  94. {
  95. getThread(name)->start(workerMethod);
  96. }
  97. void ThreadPool::stopAll()
  98. {
  99. BS_LOCK_MUTEX(mMutex);
  100. for(auto& thread : mThreads)
  101. {
  102. destroyThread(thread);
  103. }
  104. mThreads.clear();
  105. }
  106. void ThreadPool::clearUnused()
  107. {
  108. BS_LOCK_MUTEX(mMutex);
  109. mAge = 0;
  110. if(mThreads.size() <= mDefaultCapacity)
  111. return;
  112. Vector<PooledThread*> idleThreads;
  113. Vector<PooledThread*> expiredThreads;
  114. Vector<PooledThread*> activeThreads;
  115. idleThreads.reserve(mThreads.size());
  116. expiredThreads.reserve(mThreads.size());
  117. activeThreads.reserve(mThreads.size());
  118. for(auto& thread : mThreads)
  119. {
  120. if(thread->isIdle())
  121. {
  122. if(thread->idleTime() >= mIdleTimeout)
  123. expiredThreads.push_back(thread);
  124. else
  125. idleThreads.push_back(thread);
  126. }
  127. else
  128. activeThreads.push_back(thread);
  129. }
  130. idleThreads.insert(idleThreads.end(), expiredThreads.begin(), expiredThreads.end());
  131. UINT32 limit = std::min((UINT32)idleThreads.size(), mDefaultCapacity);
  132. UINT32 i = 0;
  133. mThreads.clear();
  134. for(auto& thread : idleThreads)
  135. {
  136. if(i < limit)
  137. mThreads.push_back(thread);
  138. else
  139. destroyThread(thread);
  140. }
  141. mThreads.insert(mThreads.end(), activeThreads.begin(), activeThreads.end());
  142. }
  143. void ThreadPool::destroyThread(PooledThread* thread)
  144. {
  145. thread->destroy();
  146. bs_delete(thread);
  147. }
  148. PooledThread* ThreadPool::getThread(const String& name)
  149. {
  150. UINT32 age = 0;
  151. {
  152. BS_LOCK_MUTEX(mMutex);
  153. age = ++mAge;
  154. }
  155. if(age == 32)
  156. clearUnused();
  157. PooledThread* newThread = nullptr;
  158. BS_LOCK_MUTEX(mMutex);
  159. for(auto& thread : mThreads)
  160. {
  161. if(thread->isIdle())
  162. {
  163. thread->setName(name);
  164. return thread;
  165. }
  166. }
  167. if(newThread == nullptr)
  168. {
  169. if(mThreads.size() >= mMaxCapacity)
  170. BS_EXCEPT(InvalidStateException, "Unable to create a new thread in the pool because maximum capacity has been reached.");
  171. newThread = createThread(name);
  172. mThreads.push_back(newThread);
  173. }
  174. return newThread;
  175. }
  176. UINT32 ThreadPool::getNumAvailable() const
  177. {
  178. UINT32 numAvailable = 0;
  179. BS_LOCK_MUTEX(mMutex);
  180. for(auto& thread : mThreads)
  181. {
  182. if(thread->isIdle())
  183. numAvailable++;
  184. }
  185. return numAvailable;
  186. }
  187. UINT32 ThreadPool::getNumActive() const
  188. {
  189. UINT32 numActive = 0;
  190. BS_LOCK_MUTEX(mMutex);
  191. for(auto& thread : mThreads)
  192. {
  193. if(!thread->isIdle())
  194. numActive++;
  195. }
  196. return numActive;
  197. }
  198. UINT32 ThreadPool::getNumAllocated() const
  199. {
  200. BS_LOCK_MUTEX(mMutex);
  201. return (UINT32)mThreads.size();
  202. }
  203. }