BsThreadPool.cpp 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. //__________________________ Banshee Project - A modern game development toolkit _________________________________//
  2. //_____________________________________ www.banshee-project.com __________________________________________________//
  3. //________________________ Copyright (c) 2014 Marko Pintera. All rights reserved. ________________________________//
  4. #include "BsThreadPool.h"
  5. namespace BansheeEngine
  6. {
  7. HThread::HThread()
  8. :mPool(nullptr), mThreadId(0)
  9. { }
  10. HThread::HThread(ThreadPool* pool, UINT32 threadId)
  11. :mPool(pool), mThreadId(threadId)
  12. { }
  13. void HThread::blockUntilComplete()
  14. {
  15. PooledThread* parentThread = nullptr;
  16. {
  17. BS_LOCK_MUTEX(mPool->mMutex);
  18. for (auto& thread : mPool->mThreads)
  19. {
  20. if (thread->getId() == mThreadId)
  21. {
  22. parentThread = thread;
  23. break;
  24. }
  25. }
  26. }
  27. if (parentThread != nullptr)
  28. {
  29. BS_LOCK_MUTEX_NAMED(parentThread->mMutex, lock);
  30. if (parentThread->mId == mThreadId) // Check again in case it changed
  31. {
  32. while (!parentThread->mIdle)
  33. BS_THREAD_WAIT(parentThread->mWorkerEndedCond, parentThread->mMutex, lock);
  34. }
  35. }
  36. }
  37. PooledThread::PooledThread(const String& name)
  38. :mName(name), mIdle(true), mThreadStarted(false),
  39. mThreadReady(false), mIdleTime(0), mId(0)
  40. { }
  41. PooledThread::~PooledThread()
  42. { }
  43. void PooledThread::initialize()
  44. {
  45. BS_THREAD_CREATE(t, std::bind(&PooledThread::run, this));
  46. mThread = t;
  47. BS_LOCK_MUTEX_NAMED(mMutex, lock);
  48. while(!mThreadStarted)
  49. BS_THREAD_WAIT(mStartedCond, mMutex, lock);
  50. }
  51. void PooledThread::start(std::function<void()> workerMethod, UINT32 id)
  52. {
  53. {
  54. BS_LOCK_MUTEX(mMutex);
  55. mWorkerMethod = workerMethod;
  56. mIdle = false;
  57. mIdleTime = std::time(nullptr);
  58. mThreadReady = true;
  59. mId = id;
  60. }
  61. BS_THREAD_NOTIFY_ONE(mReadyCond);
  62. }
  63. void PooledThread::run()
  64. {
  65. onThreadStarted(mName);
  66. {
  67. BS_LOCK_MUTEX(mMutex);
  68. mThreadStarted = true;
  69. }
  70. BS_THREAD_NOTIFY_ONE(mStartedCond);
  71. while(true)
  72. {
  73. std::function<void()> worker = nullptr;
  74. {
  75. BS_LOCK_MUTEX_NAMED(mMutex, lock);
  76. while(!mThreadReady)
  77. BS_THREAD_WAIT(mReadyCond, mMutex, lock);
  78. if(mWorkerMethod == nullptr)
  79. {
  80. onThreadEnded(mName);
  81. return;
  82. }
  83. worker = mWorkerMethod;
  84. }
  85. worker();
  86. {
  87. BS_LOCK_MUTEX(mMutex);
  88. mIdle = true;
  89. mIdleTime = std::time(nullptr);
  90. mThreadReady = false;
  91. BS_THREAD_NOTIFY_ONE(mWorkerEndedCond);
  92. }
  93. }
  94. }
  95. void PooledThread::destroy()
  96. {
  97. {
  98. BS_LOCK_MUTEX(mMutex);
  99. mWorkerMethod = nullptr;
  100. mThreadReady = true;
  101. }
  102. BS_THREAD_NOTIFY_ONE(mReadyCond);
  103. BS_THREAD_JOIN((*mThread));
  104. BS_THREAD_DESTROY(mThread);
  105. }
  106. bool PooledThread::isIdle()
  107. {
  108. BS_LOCK_MUTEX(mMutex);
  109. return mIdle;
  110. }
  111. time_t PooledThread::idleTime()
  112. {
  113. BS_LOCK_MUTEX(mMutex);
  114. return (time(nullptr) - mIdleTime);
  115. }
  116. void PooledThread::setName(const String& name)
  117. {
  118. mName = name;
  119. }
  120. UINT32 PooledThread::getId() const
  121. {
  122. BS_LOCK_MUTEX(mMutex);
  123. return mId;
  124. }
  125. ThreadPool::ThreadPool(UINT32 threadCapacity, UINT32 maxCapacity, UINT32 idleTimeout)
  126. :mDefaultCapacity(threadCapacity), mMaxCapacity(maxCapacity), mIdleTimeout(idleTimeout), mAge(0)
  127. {
  128. }
  129. ThreadPool::~ThreadPool()
  130. {
  131. stopAll();
  132. }
  133. HThread ThreadPool::run(const String& name, std::function<void()> workerMethod)
  134. {
  135. PooledThread* thread = getThread(name);
  136. thread->start(workerMethod, mUniqueId++);
  137. return HThread(this, thread->getId());
  138. }
  139. void ThreadPool::stopAll()
  140. {
  141. BS_LOCK_MUTEX(mMutex);
  142. for(auto& thread : mThreads)
  143. {
  144. destroyThread(thread);
  145. }
  146. mThreads.clear();
  147. }
  148. void ThreadPool::clearUnused()
  149. {
  150. BS_LOCK_MUTEX(mMutex);
  151. mAge = 0;
  152. if(mThreads.size() <= mDefaultCapacity)
  153. return;
  154. Vector<PooledThread*> idleThreads;
  155. Vector<PooledThread*> expiredThreads;
  156. Vector<PooledThread*> activeThreads;
  157. idleThreads.reserve(mThreads.size());
  158. expiredThreads.reserve(mThreads.size());
  159. activeThreads.reserve(mThreads.size());
  160. for(auto& thread : mThreads)
  161. {
  162. if(thread->isIdle())
  163. {
  164. if(thread->idleTime() >= mIdleTimeout)
  165. expiredThreads.push_back(thread);
  166. else
  167. idleThreads.push_back(thread);
  168. }
  169. else
  170. activeThreads.push_back(thread);
  171. }
  172. idleThreads.insert(idleThreads.end(), expiredThreads.begin(), expiredThreads.end());
  173. UINT32 limit = std::min((UINT32)idleThreads.size(), mDefaultCapacity);
  174. UINT32 i = 0;
  175. mThreads.clear();
  176. for(auto& thread : idleThreads)
  177. {
  178. if(i < limit)
  179. mThreads.push_back(thread);
  180. else
  181. destroyThread(thread);
  182. }
  183. mThreads.insert(mThreads.end(), activeThreads.begin(), activeThreads.end());
  184. }
  185. void ThreadPool::destroyThread(PooledThread* thread)
  186. {
  187. thread->destroy();
  188. bs_delete(thread);
  189. }
  190. PooledThread* ThreadPool::getThread(const String& name)
  191. {
  192. UINT32 age = 0;
  193. {
  194. BS_LOCK_MUTEX(mMutex);
  195. age = ++mAge;
  196. }
  197. if(age == 32)
  198. clearUnused();
  199. PooledThread* newThread = nullptr;
  200. BS_LOCK_MUTEX(mMutex);
  201. for(auto& thread : mThreads)
  202. {
  203. if(thread->isIdle())
  204. {
  205. thread->setName(name);
  206. return thread;
  207. }
  208. }
  209. if(newThread == nullptr)
  210. {
  211. if(mThreads.size() >= mMaxCapacity)
  212. BS_EXCEPT(InvalidStateException, "Unable to create a new thread in the pool because maximum capacity has been reached.");
  213. newThread = createThread(name);
  214. mThreads.push_back(newThread);
  215. }
  216. return newThread;
  217. }
  218. UINT32 ThreadPool::getNumAvailable() const
  219. {
  220. UINT32 numAvailable = 0;
  221. BS_LOCK_MUTEX(mMutex);
  222. for(auto& thread : mThreads)
  223. {
  224. if(thread->isIdle())
  225. numAvailable++;
  226. }
  227. return numAvailable;
  228. }
  229. UINT32 ThreadPool::getNumActive() const
  230. {
  231. UINT32 numActive = 0;
  232. BS_LOCK_MUTEX(mMutex);
  233. for(auto& thread : mThreads)
  234. {
  235. if(!thread->isIdle())
  236. numActive++;
  237. }
  238. return numActive;
  239. }
  240. UINT32 ThreadPool::getNumAllocated() const
  241. {
  242. BS_LOCK_MUTEX(mMutex);
  243. return (UINT32)mThreads.size();
  244. }
  245. }